RocketMQ使用

  RocketMQ是阿里巴巴在2012年開源的分佈式消息中間件,目前已經捐贈給Apache基金會,並於2016年11月成爲 Apache 孵化項目。  前端

中間件是一類鏈接軟件組件和應用的計算機軟件,它包括一組服務。以便於運行在一臺或多臺機器上的多個軟件經過網絡進行交互。
中間件技術所提供的互操做性,推進了分佈式體系架構的演進,該架構一般用於支持並簡化那些複雜的分佈式應用程序,它包括web服務器、事務監控器和消息隊列軟件。
中間件(middleware)是基礎軟件的一大類,屬於可複用軟件的範疇。顧名思義,中間件處於操做系統軟件與用戶的應用軟件的中間。
中間件在操做系統、網絡和數據庫之上,應用軟件的下層,總的做用是爲處於本身上層的應用軟件提供運行與開發的環境,幫助用戶靈活、高效地開發和集成複雜的應用軟件。

    中間件是位於平臺(硬件和操做系統)和應用之間的通用服務,這些服務具備標準的程序接口和協議。針對不一樣的操做系統和硬件平臺,中間件能夠有符合接口和協議規範的多種實現:java

  一.理論部分web

  RocketMQ就是一款分佈式消息中間件。那麼,RocketMQ主要爲了解決哪些問題呢?sql

  (1)Publish/Subscribe
  發佈與訂閱是消息中間件的最基本功能,也是相對於傳統RPC通訊而言。
數據庫

  (2)Message Priority
  規範中描述的優先級是指在一個消息隊列中,每條消息都有不一樣的優先級,通常用整數來描述,優先級高的消息先投遞,若是消息徹底在一個內存隊列中,那麼在投遞前能夠按照優先級排序,令優先級高的先投遞。
  因爲RocketMQ全部消息都是持久化的,因此若是按照優先級來排序,開銷會很是大,所以RocketMQ沒有特地支持消息優先級,可是能夠經過變通的方式實現相似功能,即單獨配置一個優先級高的隊列,和一個普通優先級的隊列, 將不一樣優先級發送到不一樣隊列便可。
apache

  (3)Message Order
  消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了3條消息,分別是訂單建立,訂單付款,訂單完成。消費時,要按照這個順序消費纔能有意義。可是同時訂單之間是能夠並行消費的。
  RocketMQ能夠嚴格的保證消息有序。
後端

  (4)Message Filter
  ①Broker端消息過濾  
  在Broker中,按照Consumer的要求作過濾,優勢是減小了對於Consumer無用消息的網絡傳輸。缺點是增長了Broker的負擔,實現相對複雜。
  ②Consumer端消息過濾
  這種過濾方式可由應用徹底自定義實現,可是缺點是不少無用的消息要傳輸到Consumer端。
服務器

  (5)Message Persistence
  消息中間件一般採用的幾種持久化方式:
  ①持久化到數據庫,例如Mysql。
     ②持久化到KV存儲,例如levelDB、伯克利DB等KV存儲系統。
     ③文件記錄形式持久化,例如Kafka,RocketMQ
     ④對內存數據作一個持久化鏡像,例如beanstalkd,VisiNotify
     ⑤前三種持久化方式都具備將內存隊列Buffer進行擴展的能力,第四種方式只是一個內存的鏡像,做用是當Broker掛掉重啓後仍然能將以前內存的數據恢復出來。
網絡

  RocketMQ充分利用Linux文件系統內存cache來提升性能。架構

  (6)Message Reliablity
  影響消息可靠性的幾種狀況:
  ①Broker正常關閉;
     ②Broker異常Crash;
     ③OS Crash;
     ④機器掉電,可是能當即恢復供電狀況。
     ⑤機器沒法開機(多是cpu、主板、內存等關鍵設備損壞)
     ⑥磁盤設備損壞。
  前四種狀況都屬於硬件資源可當即恢復狀況,RocketMQ在這四種狀況下能保證消息不丟,或者丟失少許數據(依賴刷盤方式是同步仍是異步)。
  後兩種狀況屬於單點故障,且沒法恢復,一旦發生,在此單點上的消息所有丟失。RocketMQ在這兩種狀況下,經過異步複製,可保證99%的消息不丟,可是仍然會有極少許的消息可能丟失。經過同步雙寫技術能夠徹底避免單點,同步雙寫勢必會影響性能,適合對消息可靠性要求極高的場合,例如與Money相關的應用。
  RocketMQ從3.0版本開始支持同步雙寫。

  (7)Low Latency Messaging
  在消息不堆積狀況下,消息到達Broker後,能馬上到達Consumer。RocketMQ使用長輪詢Pull方式,可保證消息很是實時,消息實時性不低於Push。
  (8)At least Once
  是指每一個消息必須投遞一次。RocketMQ Consumer先pull消息到本地,消費完成後,才向服務器返回ack,若是沒有消費必定不會ack消息,因此RocketMQ能夠很好的支持此特性。
  (9)Exactly Only Once
     ①發送消息階段,不容許發送重複的消息。
     ②消費消息階段,不容許消費重複的消息。
  只有以上兩個條件都知足狀況下,才能認爲消息是「Exactly Only Once」,而要實現以上兩點,在分佈式系統環境下,不可避免要產生巨大的開銷。因此RocketMQ爲了追求高性能,並不保證此特性,要求在業務上進行去重,也就是說消費消息要作到冪等性。RocketMQ雖然不能嚴格保證不重複,可是正常狀況下不多會出現重複發送、消費狀況,只有網絡異常,Consumer啓停等異常狀況下會出現消息重複。

  (10)Broker的Buffer問題

  Broker的Buffer一般指的是Broker中一個隊列的內存Buffer大小,這類Buffer一般大小有限。
  另外,RocketMQ沒有內存Buffer概念,RocketMQ的隊列都是持久化磁盤,數據按期清除。RocketMQ同其餘MQ有很是顯著的區別,RocketMQ的內存Buffer抽象成一個無限長度的隊列,無論有多少數據進來都能裝得下,這個無限是有前提的,Broker會按期刪除過時的數據,例如Broker只保存3天的消息,那麼這個Buffer雖然長度無限,可是3天前的數據會被從隊尾刪除。
  (11)回溯消費
  回溯消費是指Consumer已經消費成功的消息,因爲業務上的需求須要從新消費,要支持此功能,Broker在向Consumer投遞成功消息後,消息仍然須要保留。而且從新消費通常是按照時間維度,例如因爲Consumer系統故障,恢復後須要從新消費1小時前的數據,那麼Broker要提供一種機制,能夠按照時間維度來回退消費進度。
  RocketMQ支持按照時間回溯消費,時間維度精確到毫秒,能夠向前回溯,也能夠向後回溯。
  (12)消息堆積
  消息中間件的主要功能是異步解耦,還有個重要功能是擋住前端的數據洪峯,保證後端系統的穩定性,這就要求消息中間件具備必定的消息堆積能力,消息堆積分如下兩種狀況:
     ①消息堆積在內存Buffer,一旦超過內存Buffer,能夠根據必定的丟棄策略來丟棄消息,如CORBA Notification規範中描述。適合能容忍丟棄消息的業務,這種狀況消息的堆積能力主要在於內存Buffer大小,並且消息堆積後,性能降低不會太大,由於內存中數據多少對於對外提供的訪問能力影響有限。
     ②消息堆積到持久化存儲系統中,例如DB,KV存儲,文件記錄形式。 當消息不能在內存Cache命中時,要不可避免的訪問磁盤,會產生大量讀IO,讀IO的吞吐量直接決定了消息堆積後的訪問能力。
  評估消息堆積能力主要有如下四點:
  消息能堆積多少條,多少字節?即消息的堆積容量。
     消息堆積後,發消息的吞吐量大小,是否會受堆積影響?
     消息堆積後,正常消費的Consumer是否會受影響?
     消息堆積後,訪問堆積在磁盤的消息時,吞吐量有多大?
  (13)分佈式事務
  已知的幾個分佈式事務規範,如XA,JTA等。其中XA規範被各大數據庫廠商普遍支持,如Oracle,Mysql等。其中XA的TM實現佼佼者如Oracle Tuxedo,在金融、電信等領域被普遍應用。
  分佈式事務涉及到兩階段提交問題,在數據存儲方面的方面必然須要KV存儲的支持,由於第二階段的提交回滾須要修改消息狀態,必定涉及到根據Key去查找Message的動做。RocketMQ在第二階段繞過了根據Key去查找Message的問題,採用第一階段發送Prepared消息時,拿到了消息的Offset,第二階段經過Offset去訪問消息,並修改狀態,Offset就是數據的地址。
  RocketMQ這種實現事務的方式,沒有經過KV存儲作,而是經過Offset方式,存在一個顯著缺陷,即經過Offset更改數據,會令系統的髒頁過多,須要特別關注。
  (14)定時消息
  定時消息是指消息發到Broker後,不能馬上被Consumer消費,要到特定的時間點或者等待特定的時間後才能被消費。
  若是要支持任意的時間精度,在Broker層面,必需要作消息排序,若是再涉及到持久化,那麼消息排序要不可避免的產生巨大性能開銷。
  RocketMQ支持定時消息,可是不支持任意時間精度,支持特定的level,例如定時5s,10s,1m等。
  (15)消息重試
  Consumer消費消息失敗後,要提供一種重試機制,令消息再消費一次。Consumer消費消息失敗一般能夠認爲有如下幾種狀況:
  因爲消息自己的緣由,例如反序列化失敗,消息數據自己沒法處理(例如話費充值,當前消息的手機號被註銷,沒法充值)等。這種錯誤一般須要跳過這條消息,再消費其餘消息,而這條失敗的消息即便馬上重試消費,99%也不成功,因此最好提供一種定時重試機制,即過10s秒後再重試。
     因爲依賴的下游應用服務不可用,例如db鏈接不可用,外系統網絡不可達等。遇到這種錯誤,即便跳過當前失敗的消息,消費其餘消息一樣也會報錯。這種狀況建議應用sleep 30s,再消費下一條消息,這樣能夠減輕Broker重試消息的壓力。
  RocketMQ的設計模型:

  簡單說來,RocketMQ具備如下特色:
  ①是一個隊列模型的消息中間件,具備高性能、高可靠、高實時、分佈式特色。
     ②Producer、Consumer、隊列均可以分佈式。
     ③Producer向一些隊列輪流發送消息,隊列集合稱爲Topic,Consumer若是作廣播消費,則一個consumer實例消費這個Topic對應的全部隊列,若是作集羣消費,則多個Consumer實例平均消費這個topic對應的隊列集合。
     ④可以保證嚴格的消息順序。
     ⑤提供豐富的消息拉取模式。
     ⑥高效的訂閱者水平擴展能力。
     ⑦實時的消息訂閱機制。
     ⑧億級消息堆積能力。
     ⑨較少的依賴。

  RocketMQ 物理部署結構:

  RocketMQ的部署結構有如下特色:

  ①Name Server是一個幾乎無狀態節點,可集羣部署,節點之間無任何信息同步。
     ②Broker部署相對複雜,Broker分爲Master與Slave,一個Master能夠對應多個Slave,可是一個Slave只能對應一個Master,Master與Slave的對應關係經過指定相同的BrokerName,不一樣的BrokerId來定義,BrokerId爲0表示Master,非0表示Slave。Master也能夠部署多個。每一個Broker與Name Server集羣中的全部節點創建長鏈接,定時註冊Topic信息到全部Name Server。
     ③Producer與Name Server集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server取Topic路由信息,並向提供Topic服務的Master創建長鏈接,且定時向Master發送心跳。Producer徹底無狀態,可集羣部署。
     ④Consumer與Name Server集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server取Topic路由信息,並向提供Topic服務的Master、Slave創建長鏈接,且定時向Master、Slave發送心跳。Consumer既能夠從Master訂閱消息,也能夠從Slave訂閱消息,訂閱規則由Broker配置決定。

  RocketMQ 邏輯部署結構:

  RocketMQ的邏輯部署結構有Producer和Consumer兩個特色。
  (1)Producer Group
  用來表示一個發送消息應用,一個Producer Group下包含多個Producer實例,能夠是多臺機器,也能夠是一臺機器的多個進程,或者一個進程的多個Producer對象。一個Producer Group能夠發送多個Topic消息,Producer Group做用以下:
     ①標識一類Producer;
     ②能夠經過運維工具查詢這個發送消息應用下有多個Producer實例;
    ③發送分佈式事務消息時,若是Producer中途意外宕機,Broker會主動回調Producer Group內的任意一臺機器來確認事務狀態。
  (2)Consumer Group
  用來表示一個消費消息應用,一個Consumer Group下包含多個Consumer實例,能夠是多臺機器,也能夠是多個進程,或者是一個進程的多個Consumer對象。一個Consumer Group下的多個Consumer以均攤方式消費消息,若是設置爲廣播方式,那麼這個Consumer Group下的每一個實例都消費全量數據。
  RocketMQ 數據存儲結構:

  

  RocketMQ採起了一種數據與索引分離的存儲方法。有效下降文件資源、IO資源,內存資源的損耗。即使是阿里這種海量數據,高併發場景也可以有效下降端到端延遲,並具有較強的橫向擴展能力。

  二.實踐部分

  1.在服務器上安裝RocketMQ

   此處略。

  2.程序中使用RocketMQ

   建立一個maven項目,在pom文件中添加RocketMQ客戶端jar包的依賴。

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.1.0-incubating</version>
</dependency>

   建立生產者:

//Producer.java
package itszt; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; /** * 生產者 */ public class Producer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("127.0.0.1:9876"); try { producer.start(); Message msg = new Message("PushTopic", "push", "1", "Just for test.".getBytes()); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "2", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "1", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); }finally{ producer.shutdown(); } } }

   建立消費者:

//Consumer.java
package itszt;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 消費者
 */
public class Consumer {
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("PushConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        try {
            //訂閱PushTopic下Tag爲push的消息
            consumer.subscribe("PushTopic", "push");

            //程序第一次啓動從消息隊列頭取數據
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                                                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext Context) {
                                                     Message msg = list.get(0);
//                            System.out.println(msg.toString());

                                                     String topic = msg.getTopic();
                                                     System.out.println("topic = " + topic);
                                                     byte[] body = msg.getBody();
                                                     System.out.println("body:  " + new String(body));
                                                     String keys = msg.getKeys();
                                                     System.out.println("keys = " + keys);
                                                     String tags = msg.getTags();
                                                     System.out.println("tags = " + tags);
                                                     System.out.println("-----------------------------------------------");

                                                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                                 }
                                             }
            );
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索