拍拍貸消息系統原理與應用

前言

在5月12日的Java開發者大會上,除了我本人進行分享以外,還有其餘5位優秀的老師也有精彩的分享。sql

今天我將根據本身的記憶來給你們分享下 拍拍貸 基礎框架研發資深專家 李乘勝老師的演講內容。數據庫

李乘勝老師演講的主題是:拍拍貸消息系統原理與應用,也就是拍拍貸內部使用的消息系統,自研發的,沒有用市場上開源的。bash

咱們都知道,對於大廠來講,是有絕對的自研發的技術實力。自研發每每能更貼近公司實際的業務使用場景,因此不少大廠都選擇了自研。微信

有一個好消息就是後面這套消息系統後面會開源,咱們又多了一個很是優秀的消息系統。可是在目前爲止尚未對應的文章來介紹這套消息系統,因此今天的文章你們要認真閱讀哦!多線程

內容有點偏多,不會所有寫出來,感興趣的朋友能夠加我微信 jihuan900 我發原始的PPT給你參考學習。架構

介紹

拍拍貸消息系統是拍拍貸中間件團隊,在普遍調研業界開源消息系統的基礎上,結合公司現狀和自身實踐,研發的一款輕量級消息系統。具備豐富的功能和完善的治理框架

對於框架的選型咱們最在乎的就是它的性能怎麼樣?有沒有被大規模使用?運維

  • 支撐拍拍貸天天高峯27+億消息量、1000多G的消息數據量。
  • 平常高峯2萬Tps寫入。

消息系統你最在乎的問題有哪些

  1. 消息發送慢怎麼辦? 有監控功能,發現問題後能夠擴容Broker或者擴容隊列
  2. 消息堆積了,如何快速知曉與處理? 這個能夠根據客戶端的監控查看消費耗時和機器負載來覺定是否加隊列,加線程數,或者加消費實例
  3. 失敗消息如何處理? 獨立的失敗隊列,重試
  4. 如何保障消息的高可靠? 數據庫保證

上面4點是李老師PPT上介紹的,我在這邊補充幾點:ide

  • 如何保證消息的有序性? 目前消息存儲在Mysql中,id是自增加,消費的時候從小到大消費,只能保證單隊列有序性
  • 如何實現延遲消息? 能夠在後臺配置延遲多久消費,文章後面會進行講解
  • 可否支持消息回溯? 後臺修改偏移量
  • 部署維護是否方便? 這個部署運維簡單,分庫分表不用本身考慮,portal有部署腳本

上面是咱們在選用消息系統的時候一般都會去考慮的問題,目前市面上最多見的有Kafka、RabbitMQ、RocketMQ 等,那麼拍拍貸的消息系統跟這些開源的區別在哪呢?下面咱們一塊兒來了解下這款消息系統的架構設計。微服務

架構設計

架構設計

拍拍貸消息系統的設計仍是簡潔易懂的,消息存儲直接用了數據庫來實現,不用考慮存儲這塊的複雜性。

發佈消息使用Http協議,支持全部語言,消息訂閱也是Http協議,目前採起的是主動拉取消息的方式,批量拉取。

Broker 是無狀態的,能夠搭建集羣,水平擴展,無單點問題

在個人書《Spring Cloud微服務-全棧技術與案例解析》中也有對消息可靠性的介紹,大概的思路跟今天介紹的消息系統差很少,都是消息先落地到數據庫中。

個人是將數據庫中的消息投遞到消息隊列,經過消息隊列來消費,消費完以後手動確認消費加劇試來保證可靠性,固然在裏面也能夠作不少治理的功能。我這樣作主要是對於生產方來講屏蔽了隊列,封裝成了消息服務,可是對於消費方來講仍是要關注隊列的存在。

拍拍貸這套就徹底拋棄了第三方的消息隊列,消費消息也是本身開發的,拉取模式。

消息複用與偏移

每一個消費方會記錄本身的偏移量,後臺還能夠動態修改偏移量來達到消息回溯的功能。

功能點

動態啓停消費

動態調整偏移

支持延時消息,支持多線程消費,動態調整

消息消費狀況查看

指定失敗消息從新消費

監控治理

在監控這塊,咱們最關心的有如下幾點:

  • 消費失敗告警
  • 消息堆積告警
  • Topic消息報表, 天天的消費狀況,性能等
  • 發送效率慢,怎麼排查?

對於一些告警,該消息系統都支持了,並且有很是多豐富的報表功能,監控對接了Cat,排查問題很是方便。

客戶端發送

客戶端發送消息很是簡單,有現成封裝好了的SDK

MqClient.publish("Test1", "", new ProducerDataDto(「111111")); 複製代碼

客戶端消費

配置訂閱

<?xml version="1.0" encoding="UTF-8" ?>
<messageQueue>
   <consumer groupName="Test1Sub">
      <topics>
         <topic name="Test1" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
     </topics>
   </consumer>
</messageQueue>
複製代碼

代碼訂閱

ConsumerGroupVo consumerGroup = new ConsumerGroupVo("Test1Sub");
ConsumerGroupTopicVo topicVo = new ConsumerGroupTopicVo();
topicVo.setName("Test");
topicVo.setSubscriber(new ISubscriber() {
    @Override
    public List<Long> onMessageReceived(List<MessageDto> messages) {
        return null;
    }
});
consumerGroup.addTopic(topicVo);
MqClient.registerConsumerGroup(consumerGroup);
複製代碼

猿天地
相關文章
相關標籤/搜索