那天我和同事一塊兒吃完晚飯回公司加班,而後就羣裏就有人@我說xxx商戶說收不到推送,一開始以爲沒啥。我第一反應是否是極光沒註冊上,就讓客服通知商戶,從新登陸下試試。這邊打開極光推送的後臺進行檢查。後面反應收不到推送的愈來愈多,我就知道這事情不簡單。java
因爲大量商戶反應收不到推送,我第一反應是否是推送系統掛了,致使沒有進行推送。因而讓運維老哥檢查推送系統各節點的狀況,發現都正常。因而打開RabbitMQ的管控臺看了一下,人都蒙了。已經有幾萬條消息處於ready
狀態,還有幾百條unacked
的消息。git
我覺得推送服務和MQ鏈接斷開了,致使沒法推送消息,因而讓運維重啓推送服務,將全部的推送服務重啓完,發現unacked
的消息所有變成ready
,可是沒過多久又有幾百條unacked
的消息了,這個就很明顯了能消費,沒有進行ack
呀。spring
當時我覺得是網絡問題,致使mq沒法接收到ack
,讓運維老哥檢查了一下,發現網絡沒問題。如今看是真的是傻,網絡有問題鏈接都連不上。因爲肯定的是沒法ack
形成的,立馬將ack模式
由原來的manual
改爲auto
緊急發佈。將全部的節點升級好之後,發現推送正常了。shell
你覺得這就結束了其實並無,沒過多久發現有一臺MQ服務出現異常,因爲生產採用了鏡像隊列
,當即將這臺有問題的MQ從集羣中移除。直接進行重置,而後加入回集羣。這事情算是告一段落了。此時已經接近24:00了。json
時間來到次日上午10:00,運維那邊又出現報警了,說推送系統有臺機器,磁盤快被寫滿了,而且佔用率很高。個人乖乖從昨晚到如今寫了快40G的日誌,一看報錯信息瞬間就明白問題出在哪裏了。麻溜的把bug
修了緊急發佈。網絡
吐槽一波公司的ELK,壓根就沒有收集到這個報錯信息,致使我沒有及時發現。
spring: # 消息隊列 rabbitmq: host: 10.0.0.53 username: guest password: guest virtual-host: local port: 5672 # 消息發送確認 publisher-confirm-type: correlated # 開啓發送失敗退回 publisher-returns: true listener: simple: # 消費端最小併發數 concurrency: 1 # 消費端最大併發數 max-concurrency: 5 # 一次請求中預處理的消息數量 prefetch: 2 # 手動應答 acknowledge-mode: manual
@RabbitListener(queues = ORDER_QUEUE) public void receiveOrder(@Payload String encryptOrderDto, @Headers Map<String,Object> headers, Channel channel) throws Exception { // 解密和解析 String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto); OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class); try { // 模擬推送 pushMsg(orderDto); }catch (Exception e){ log.error("推送失敗-錯誤信息:{},消息內容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto)); }finally { // 消息簽收 channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); } }
看起來好像沒啥問題。因爲和交易系統約定好,訂單數據須要先轉換json
串,而後再使用AES
進行加密,因此這邊須要,先進行解密而後在進行解析。才能獲得訂單數據。併發
爲了防止消息丟失,交易系統作了失敗重發
機制,防止消息丟失,不巧的是重發的時候沒有對訂單數據進行加密。這就致使推送系統,在解密的時候出異常,從而沒法進行ack
。運維
默默的吐槽一句:人在家中坐,鍋從天上來。
推送代碼curl
發送3條正常的消息fetch
curl http://localhost:8080/sendMsg/3
發送1條錯誤的消息
curl http://localhost:8080/sendErrorMsg/1
再發送3條正常的消息
curl http://localhost:8080/sendMsg/3
觀察日誌發下,雖然有報錯,可是還能正常進行推送。可是RabbitMQ已經出現了一條unacked
的消息。
繼續發送1條錯誤的消息
curl http://localhost:8080/sendErrorMsg/1
再發送3條正常的消息
curl http://localhost:8080/sendMsg/3
這個時候你會發現控制檯報錯,固然錯誤信息是解密失敗,可是正常的消息卻沒有被消費,這個時候其實隊列已經阻塞了。
從RabbitMQ
管控臺也能夠看到,剛剛發送的的3條消息處於ready
狀態。這個時候就若是一直有消息進入,都會堆積在隊裏裏面沒法被消費。
再發送3條正常的消息
curl http://localhost:8080/sendMsg/3
上面說了是因爲沒有進行ack
致使隊裏阻塞。那麼問題來了,這是爲何呢?其實這是RabbitMQ
的一種保護機制。防止當消息激增的時候,海量的消息進入consumer
而引起consumer
宕機。
RabbitMQ提供了一種QOS(服務質量保證)功能,即在非自動確認的消息的前提下,限制信道上的消費者所能保持的最大未確認的數量。能夠經過設置PrefetchCount
實現。
舉例說明:能夠理解爲在consumer
前面加了一個緩衝容器,容器能容納最大的消息數量就是PrefetchCount
。若是容器沒有滿RabbitMQ
就會將消息投遞到容器內,若是滿了就不投遞了。當consumer
對消息進行ack
之後就會將此消息移除,從而放入新的消息。
listener: simple: # 消費端最小併發數 concurrency: 1 # 消費端最大併發數 max-concurrency: 5 # 一次處理的消息數量 prefetch: 2 # 手動應答 acknowledge-mode: manual
prefetch參數就是PrefetchCount
經過上面的配置發現prefetch
我只配置了2,而且concurrency
配置的只有1,因此當我發送了2條錯誤消息之後,因爲解密失敗這2條消息一直沒有被ack
。將緩衝區沾滿了,這個時候RabbitMQ
認爲這個consumer
已經沒有消費能力了就不繼續給它推送消息了,因此就形成了隊列阻塞。
當ack
模式爲manual
,而且線上出現了unacked
消息,這個時候不用慌。因爲QOS是限制信道channel
上的消費者所能保持的最大未確認的數量。因此容許出現unacked
的數量能夠經過channelCount * prefetchCount * 節點數量
得出。
channlCount
就是由concurrency
,max-concurrency
決定的。
min
= concurrency * prefetch * 節點數量
max
= max-concurrency * prefetch * 節點數量
由此能夠的出結論
unacked_msg_count
< min
隊列不會阻塞。但須要及時處理unacked
的消息。unacked_msg_count
>= min
可能會出現堵塞。unacked_msg_count
>= max
隊列必定阻塞。這裏須要好好理解一下。
其實處理的方法很簡單,將解密和解析的方法放入try catch
中就解決了這樣無論解密正常與否,消息都會被簽收。若是出錯將會輸出錯誤日誌,讓開發人員進行處理了。
對於這個就須要有日誌監控系統,來及時告警了。
@RabbitListener(queues = ORDER_QUEUE) public void receiveOrder(@Payload String encryptOrderDto, @Headers Map<String,Object> headers, Channel channel) throws Exception { try { // 解密和解析 String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto); OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class); // 模擬推送 pushMsg(orderDto); }catch (Exception e){ log.error("推送失敗-錯誤信息:{},消息內容:{}", e.getLocalizedMessage(), encryptOrderDto); }finally { // 消息簽收 channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); } }
unacked
的消息在consumer
切斷鏈接後(重啓),會自動回到隊頭。
一開始我不知道代碼有問題,就是覺得單純的沒有進行ack
因此將ack
模式改爲auto
自動,緊急升級了,這樣無論正常與否,消息都會被簽收,因此在當時確實是解決了問題。
其實如今回想起來是很是危險的操做的,將ack
模式改爲auto
自動,這樣會使QOS不生效。會出現大量消息涌入consumer
從而形成consumer
宕機,能夠是由於當時在晚上,交易比較少,而且推送系統有多個節點,纔沒出現問題。
@RabbitListener(queues = ORDER_QUEUE) public void receiveOrder(@Payload String encryptOrderDto, @Headers Map<String,Object> headers, Channel channel) throws Exception { // 解密和解析 String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto); OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class); try { // 模擬推送 pushMsg(orderDto); }catch (Exception e){ log.error("推送失敗-錯誤信息:{},消息內容:{}", e.getLocalizedMessage(), encryptOrderDto); }finally { // 消息簽收 channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); } }
配置文件
listener: simple: # 消費端最小併發數 concurrency: 1 # 消費端最大併發數 max-concurrency: 5 # 一次處理的消息數量 prefetch: 2 # 手動應答 acknowledge-mode: auto
因爲當時不知道交易系統的重發機制,重發時沒有對訂單數據加密的bug,因此仍是會發出少許有誤的消息。
發送1條錯誤的消息
curl http://localhost:8080/sendErrorMsg/1
RabbitMQ
消息監聽程序異常時,consumer
會向rabbitmq server
發送Basic.Reject
,表示消息拒絕接受,因爲Spring
默認requeue-rejected
配置爲true
,消息會從新入隊,而後rabbitmq server
從新投遞。就至關於死循環了,因此控制檯在瘋狂刷錯誤日誌形成磁盤利用率飆升的緣由。
將default-requeue-rejected: false
便可。
try { // 業務邏輯。 }catch (Exception e){ // 輸出錯誤日誌。 }finally { // 消息簽收。 }
https://gitee.com/huangxunhui...
若是有人告訴你遇到線上事故不要慌,除非是超級大佬久經沙場。不然就是瞎扯淡,你讓他來試試,看看他會不會大腦一片空白,直冒汗。
若是以爲對你有幫助,能夠多多評論,多多點贊哦,也能夠到個人主頁看看,說不定有你喜歡的文章,也能夠隨手點個關注哦,謝謝。