上週,咱們舉辦了第二屆技術沙龍,我這邊主要演講了消息隊列技術的議題,現分享給你們:數組
在咱們團隊內部,隨着消息應用中心(任務中心)的普遍應用,有時候咱們感受不到消息隊列的存在,但這不影響消息隊列在高可用、分佈式、高併發架構下的核心地位。服務器
消息隊列都應用到了哪些實際的應用場景中?網絡
1、再談消息隊列的應用場景架構
可是,咱們對消息隊列的底層技術和原理仍是不瞭解,那麼咱們立刻開始吧…併發
2、消息隊列的一些基本概念和簡單原理異步
1. Broker分佈式
Broker的概念來自與Apache ActiveMQ,通俗的講就是MQ的服務器。高併發
2. 消息的生產者、消費者性能
消息生產者Producer:發送消息到消息隊列。線程
消息消費者Consumer:從消息隊列接收消息。
3. 點對點消息隊列模型
消息生產者向一個特定的隊列發送消息,消息消費者從該隊列中接收消息;
消息的生產者和消費者能夠不一樣時處於運行狀態。
每個成功處理的消息都由消息消費者簽收確認(Acknowledge)。如圖:
4. 發佈訂閱消息模型-Topic
發佈訂閱消息模型中,支持向一個特定的主題Topic發佈消息,0個或多個訂閱者接收來自這個消息主題的消息。在這種模型下,發佈者和訂閱者彼此不知道對方。實際操做過程當中,
發佈訂閱消息模型中,支持向一個特定的主題Topic發佈消息,0個或多個訂閱者接收來自這個消息主題的消息。在這種模型下,發佈者和訂閱者彼此不知道對方。實際操做過程當中,
必須先訂閱,再發送消息,然後接收訂閱的消息,這個順序必須保證。
5. 消息的順序性保證
基於Queue消息模型,利用FIFO先進先出的特性,能夠保證消息的順序性。
6. 消息的ACK確認機制
即消息的Ackownledge確認機制,
爲了保證消息不丟失,消息隊列提供了消息Acknowledge機制,即ACK機制,當Consumer確認消息已經被消費處理,發送一個ACK給消息隊列,此時消息隊列即可以刪除這個消息了。若是Consumer宕機/關閉,沒有發送ACK,消息隊列將認爲這個消息沒有被處理,會將這個消息從新發送給其餘的Consumer從新消費處理。
7. 消息的持久化
消息的持久化,對於一些關鍵的核心業務來講是很是重要的,啓用消息持久化後,消息隊列宕機重啓後,消息能夠從持久化存儲恢復,消息不丟失,能夠繼續消費處理。
8. 消息的同步和異步收發
同步:消息的收發支持同步收發的方式。
同時還有另外一種同步方式:同步收發場景下,消息生產者和消費者雙向應答模式,例如:張三寫封信送到郵局中轉站,而後李四從中轉站得到信,而後在寫一份回執信,放到中轉站,而後張三去取,固然張三寫信的時候就得寫明回信地址
消息的接收若是以同步的方式(Pull)進行接收,若是隊列中爲空,此時接收將處於同步阻塞狀態,會一直等待,直到消息的到達。
異步:消息的收發一樣支持異步方式:異步發送消息,不須要等待消息隊列的接收確認;異步接收消息,以Push的方式觸發消息消費者接收消息。
9. 消息的事務支持
消息的收發處理支持事務,例如:在任務中心場景中,一次處理可能涉及多個消息的接收、處理,這處於同一個事務範圍內,若是一個消息處理失敗,事務回滾,消息從新回到隊列中。
3、咱們對消息隊列的實際使用
咱們使用了兩種消息隊列組件:
RabbitMQ:高可用、高可靠消息應用場景,例如記帳失敗重試、通知服務,消息不容許丟
Kafka:高性能消息應用場景,例如日誌、監控,消息容許丟失。
在此之上,咱們封裝了消息應用中心、日誌服務等核心組件和服務。那麼,消息應用中心和日誌都用到了消息隊列什麼技術? 乾貨來了…
1. 消息應用中心
消息應用中心(任務中心)使用了消息隊列的異步處理、數據同步、重試補償、系統解耦、流量消峯等特性。其中:
消息應用中心(任務中心),支持RabbitMQ和Kafka兩種消息通道,支持在任務元數據層面設置
任務:就是一個包含了任務執行上下文的消息,同時表明了異步處理
任務發送者(ITaskSender)發送任務:消息的生產者將任務消息發送的消息隊列
任務類型:消息隊列名稱,例如:HaKeepAcco***Queue,充電補償記帳隊列
消息隊列:任務的臨時存儲
任務中心:任務集中處理,消息消費者
任務處理完成:消息Ack確認
任務的多級重試:多個重試消息隊列,HaSysTaskStore2Queue
2. 日誌組件
日誌組件,使用了消息隊列的高併發緩衝和發佈訂閱特性。其中:
日誌組件使用Kafka做爲消息通道,由於Kafka的性能好,吞吐量大, 能夠容忍偶爾的消息數據丟失
日誌組件使用發佈訂閱的消息模型
日誌組件包含日誌服務SDK和日誌HSF服務,兩者都是消息的生產者Producer
日誌類型:消息的Topic主題
日誌處理器:消息的消費者、Topic的訂閱、日誌數據處理(Hbase\ES\其餘)
3. RPC服務狀態變化通知
RPC服務狀態變化通知,使用了消息隊列的發佈訂閱特性。其中:
RPC服務狀態變化通知,使用了RabbitMQ消息隊列技術
使用發佈訂閱的消息模型
Topic:RPCServiceState
RPCService.Proxy:RPC服務狀態變化消息的訂閱者
RPC服務註冊、發佈:消息的生產者,發送RPC服務狀態變化消息。
4、消息隊列使用的最佳實踐
1. RabbitMQ的鏈接,底層都是Socket鏈接,長鏈接 or 短鏈接?
RabbitMQ每一個在建立每一個鏈接的同時,會自動建立一個監視線程來定時(默認60s)偵測鏈接的狀態,若是鏈接斷開,觸發ConnectionShutdown事件。
用長鏈接,仍是用短鏈接??
發送端:建議使用短鏈接,用完即釋放,避免長鏈接帶來的端口占用,由於發送端無處不在,發送操做短而急促。
接收端:建議使用長鏈接,時刻接收處理消息,由於消息的接收消費比較集中,接收操做久而彌堅。
2. 網絡是有抖動的,鏈接的斷開是正常的,如何應對?
發送端:發送失敗重試
接收端:註冊ConnectionShutdown事件同時捕獲消息接收異常,從新創建鏈接,接收消費消息
3. RabbitMQ Exchange(Topic)模式下帶來的消息隊列數量激增
只是建立了一個Exchange(Topic),爲何會增長這麼多Queue。
由於,每一個Topic的訂閱都是綁定一個Queue用做消息的消費。
4. 需求的演變,消息結構的變動,如何平滑過分?
消息是byte[]數組,咱們將複雜對象消息二進制序列化。
接收到消息後,咱們將二進制數組反序列化爲實體類。
當咱們的實體類消息體的結構發生變化後,由於受二進制序列化處理的
影響,致使沒法反序列化。
解決方案:
消息體預留一些string類型的擴展字段
消息隊列版本化,支持多個版本的消息體。
5. Kafka Consumer Group
同一Topic的一條消息只能被同一個Group內的一個Consumer消費
多個Consumer Group可同時消費同一條消息
6. 消息的積壓
消息的積壓產生的緣由:消息接收消費的速率低,發送的速度>接收的速度。
消息積壓後的影響:
消息大量積壓後,當新的消費者鏈接上MQ並開始接收消息時,發送速率會大幅下降。
消息隊列集羣的壓力增長,大量的消息要持久化存儲和同步。
如何減小消息積壓:快速消費消息,同時保持消息體的不要過大。
周國慶
2017/7/3