上一章節聊到,他有三個重要的部分,【生產者】、【blocker(rabbit節點】、【消費者】 ,換言之,咱們保障了發送可靠性、存儲可靠性、消費可靠性,也就保證了消息可靠性。下面會出一個消息可靠性的方案,有時候咱們須要對一個超時訂單作處理,咱們可使用rabbit的死信隊列作這個,下面咱們也會聊聊死信隊列,已經對rabbit進行監控。html
通常消息可靠性分爲三個級別:node
【最多一次(rabbit支持)】:消息可能會丟失,可是不會重複傳輸。那須要->spring
- 消費者就須要開啓事務機制或者confirm機制(rabbit提供的,當消息發送到消費者的時候,生產端有會收到異步回調,從而知道正確投遞),以此保障能夠傳遞到mq中
- 消費者須要備份交換機,肯定消息能夠從交換機路由到隊列中,這樣就可讓消息不丟失
- 或者經過mandatory屬性(rabbit提供,當消息沒法找到相關的隊列,那就mq就返回沒有投遞成功的消息給生產者)
【最少一次(rabbit支持)】;消息絕對不會丟失,可是可能重複傳輸 那須要->數據庫
- 無需考慮最多一次所須要考慮的東西,消費者發一次,可是這樣很難保證成功。
【剛好一次(rabbit不支持)】:每條消息會傳輸一次且只傳輸一次json
下面結合給出一個解決方案(這個性能不太好,由於要走下面的流程,並且還要對數據庫進行操做):api
- 【發送消息的時候】:把發送的消息保存在數據庫中
- 【當消息發送到rabbit上】:在rabbit上有一個監聽器
- 【當把消息發送到生產者這裏的時候】:消費者給一個回覆(ack)給mq(mq通知生產者,這樣就知道消息投遞成功)
- 【更新消息表中的消息狀態 】
- 這個時候咱們要有一個定時任務去檢查數據庫中的消息狀態(查看是不是發送成功狀態(發送成功會在消費端對數據庫的數據狀態進行修改))
- 消息狀態爲【未知】:把消息拿過來進行重發,而且記錄消息重發的次數(由於不能一直重發,這樣服務器壓力大),同時修改狀態爲定時任務干預,你能夠設置一個閾值,到達必定次數後,改變狀態爲人工干預。
- 消息狀態爲【人工干預】:這個時候就要人工進行處理異常狀態的信息
springBoot中confirm的實現->【生產端】服務器
這裏是監聽功能異步
@Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { /*** @param correlationData * 相關配置信息 * @param ack exchange交換機 是否成功收到了消息。true 成功,false表明失敗 * @param cause 失敗緣由 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ System.out.println("成功發送"); }else { System.out.println("失敗"+cause); //這裏進行重發,或者本身的業務處理 } } }交換機和隊列進行綁定tcp
View Code/*** 隊列與交換機綁定 */ @Configuration public class QueueConfig { @Bean(name = "confirmTestQueue") public Queue confirmTestQueue() { return new Queue("confirm_test_queue", true, false, false); } @Bean(name = "confirmTestExchange") public FanoutExchange confirmTestExchange() { return new FanoutExchange("confirmTestExchange"); } @Bean public Binding confirmTestFanoutExchangeAndQueue(@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange, @Qualifier("confirmTestQueue") Queue confirmTestQueue) { return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange); } }
生產者發送消息ide
View Code@RunWith(SpringRunner.class) @SpringBootTest(classes = SpringBootForkJoinApplication.class) public class Producer { @Autowired private RabbitTemplate rabbitTemplate; //注入rabbitmq對象 @Autowired private ConfirmCallbackService confirmCallbackService; //注入 ConfirmCallback對象 @Test public void test() { rabbitTemplate.setConfirmCallback(confirmCallbackService); //發送消息 rabbitTemplate.convertAndSend("dlq_exchange1", "", "hello,ConfirmCallback你好"); } }自動回調生產者
springBoot配置文件
spring: rabbitmq: publisher-confirm-type: correlated # 開啓confirm確認模式
消費端(進行手動簽收)
@Component @RabbitListener(queues = "confirm_test_queue") public class ReceiverMessage { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try {System.out.println("消息內容===>" + new String(message.getBody())); //TODO 具體業務邏輯 //手動簽收[參數1:消息投遞序號,參數2:批量簽收] channel.basicAck(deliveryTag, true); } catch (Exception e) { //拒絕簽收[參數1:消息投遞序號,參數2:批量拒絕,參數3:是否從新加入隊列] channel.basicNack(deliveryTag, true, true); } } }
就是,當消息成爲死消息後,能夠被從新發送到另外一個隊列,這個隊列就是死信隊列(他和別的隊列沒有區別),綁定這個隊列的交換機就是DLX,當存在有一個死信消息的時候,rebbit會將這個消息發送到死信交換機上,從而路由到死信隊列上
消息變成死消息通常有這幾種狀況:
- 消息被拒絕(basic.reject / basic.nack),而且requeue = false
- 消息TTL過時
- 隊列達到最大長度
TTL(Time-To-Live):表示的是你發送消息的有效期,通常有兩種設置方式
- 聲明隊列的時候,在隊列的屬性中設置
- 發送消息時給消息設置屬性
![]()
- 死信交換機綁定(對於死信隊列,他是一個正常隊列,咱們按照以前的章節中設置的方法設置就行)
模擬訂單支付流程【使用死信隊列】
一個正常的支付消息發送到交換機->交換機路由到,支付消息應有的隊列中->這個時候用戶超時尚未進行消費(TTL)->支付消息變成了死信消息->死信消息進入了死信交換機->死信交換機把死信消息路由到死信隊列中->對死信隊列進行監聽->發信息通知用戶,或者修改訂單狀態
在真正的環境中,咱們須要實時瞭解到咱們mq的狀態和狀況,從而保障他能夠完美的提供服務,如下給出3中常見方案:
【使用management】:這個通常在小項目中進行使用,【簡單,可是麻煩,由於若是是集羣可能要開多個窗口】
【使用他給的api:】 他本身能把數據展現出來到management上,確定調用了http接口,那咱們也能夠這樣幹【須要開發,而且要看相關文檔】,經常使用的api以下:
- 歸納信息:http://localhost:15672/api/overview
- channel 列表:http://localhost:15672/api/channels節點信息:http://localhost:15672/api/nodes
- 交換機信息:http://localhost:15672/api/exchanges
- 隊列信息:http://localhost:15672/api/queues
- vhost 列表:http://localhost:15672/api/vhosts
prometheus + grafana 監控rabbitmq
zabbix不一樣,zabbix是主動接受消息,而普羅米修斯(prometheus )是定時對我們的消息進行拉取的,他制定了一些規範【exporter】,各個想讓他監控的中間件都要實現本身的exporter,而且他要求傳遞給他的都是json的數據,
咱們把rabbit提供的普羅米修斯的插件啓動:rabbitmq-plugins enable rabbitmq_prometheus
同時開啓端口:firewall-cmd --zone=public --add-port=15692/tcp --permanent
訪問普羅米修斯能夠認識的數據:http://你的ip:15692/metrics
剩下的就交給普羅米修斯了
grafana 中搜索相關的模板進行導入(https://grafana.com/grafana/dashboards?search=rabbitmq)
最終效果(中間的過程太繁瑣了,我們只是聊聊這個東西,具體能夠參考別的博文。)
這裏是官方對Prometheus集成獲取指標的文檔 https://www.rabbitmq.com/prometheus.html
【延遲隊列】
實際上仍是能夠用上面的死信隊列的流程進行實現,設置隊列過時時間給你想要的延遲時間,當延遲時間事後,進入死信隊列,這個時候執行真正的邏輯
【消息積壓】
前面說到當內存到必定程度(0.4)的時候,rabbit不會接受消息,咱們說過能夠經過配置文件對這個數據臨時增大,這個時候rabbit運行正常,咱們就能夠增長消費者對數據進行消費,這樣就能夠解決積壓問題。
【不重複消費】:
每一個消費者給一個惟一id,使用id進行判斷