RocketMQ學習筆記

RocketMQ學習筆記

參考資料:java

安裝啓動(非集羣模式)

  1. 官網下載二進制安裝包(固然也可下載源碼包後本身編譯):下載地址mysql

  2. 解壓redis

    unzip rocketmq-all-4.3.0-bin-release.zip
  3. 修改配置sql

    • conf/broker.conf 中新增數據庫

      brokerIP1 = 192.168.195.88
      autoCreateTopicEnable = true  # 線上環境應該設爲false
    • bin/runbroker.sh中修改JVM內存大小,默認是8G,通常本身電腦的上虛擬機可能沒這麼大apache

  4. 啓動架構

    # 後臺啓動NameServer
    	nohup sh bin/mqnamesrv -n 192.168.195.88:9876 &  
    	# 查看日誌,看是否啓動成功
    	tail -f ~/logs/rocketmqlogs/namesrv.log
    
    	# 後臺啓動Broker
    	nohup sh bin/mqbroker -n 192.168.195.88:9876 -c conf/broker.conf &
    	# 查看日誌,看是否啓動成功
    	tail -f ~/logs/rocketmqlogs/broker.log
  5. 中止dom

    sh bin/mqshutdown broker
    sh bin/mqshutdown namesrv

    注意:異步

    • 記得在防火牆中開啓9876(nameServer用)、10909(生產者用)、10911(消費者用)端口
    • 上面的192.168.195.88爲我本身的機器的IP,不要使用localhost

經常使用命令

在RocketMQ的bin目錄下有一個mqadmin腳本,它充當着控制檯的角色,能夠用來完成咱們經常使用的操做。如不喜歡命令可安裝第三方的可視化操控界面工具async

  1. 建立topic

    sh mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t testTopic1
    # 參數 -n爲nameServe服務地址  -b爲broker服務地址  -t爲topic的名字
  2. 查詢全部topic

    sh mqadmin topicList -n localhost:9876
    # 參數 -n爲nameServe服務地址
  3. 查看Topic統計信息

    sh mqadmin topicStatus -n localhost:9876 -t testTopic1
    # 參數 -n爲nameServe服務地址 -t爲topic的名字
  4. 查看消費組信息

    sh mqadmin consumerProgress -n localhost:9876 -g simple_push_consumer_group_01
    # 參數 -n爲nameServe服務地址 -g爲消費組的名字,無則表示查看全部的
  5. 查看全部命令:sh mqadmin

原理概述

系統架構

系統架構圖

注意點:

  • NameServer提供服務發現和路由。 每一個 NameServer 記錄完整的路由信息,提供等效的讀寫服務,Broker啓動後將本身註冊至NameServer;隨後每隔30s按期向NameServer上報Topic路由信息。每一個 Broker 與NameServer 集羣中的全部節點都會創建長鏈接。
  • Producer 與 NameServer 集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從 NameServer 獲取 Topic 路由信息,並向提供 Topic 服務的 Broker Master 創建長鏈接,且定時向 Broker 發送心跳。Producer 只能將消息發送到 Broker master。
  • Consumer 可同時和提供Topic服務的master和Slave創建長鏈接,即在master節點宕機時,消費者能夠從slave節點讀取消息。
  • Broker的主從切換問題,暫未研究

數據存儲

RocketMQ的數據存儲主要有三個內容:ConsumeQueue、CommitLog和IndexFile

ConsumeQueue是消息的邏輯隊列,是由20字節定長的二進制數據單元組成,其中commitLogOffset(8 byte)、msgSize(4 byte)、tagsHashCode(8 byte);每一個Topic和QueuId對應一個ConsumeQueue;單個文件大小約5.72M,每一個文件由30W條數據組成,每一個文件默認大小爲600萬個字節,當一個ConsumeQueue類型的文件寫滿了,則寫入下一個文件。

CommitLog是消息存放的實際物理位置,每一個Broker下全部的Topic下的消息隊列共用同一個CommitLog的日誌數據文件來存儲,全部RocketMQ的寫入是順序的。單個CommitLog文件的默認大小爲1G。

IndexFile即消息索引,若是一個消息包含key值的話,會使用IndexFile存儲消息索引,其每一個單元的數據構成爲keyHash(4 byte)、commitLogOffset(8 byte)、timestamp(4 byte)、nextIndexOffset(4byte)。IndexFile主要是用來根據key來查詢消息。

數據存儲結構圖

  • Producer端發送消息最終寫入的是CommitLog,寫入CommitLog有同步刷盤和異步刷盤兩種方式:

    同步刷盤:只有在消息真正持久化至磁盤後,Broker端纔會真正地返回給Producer端一個成功的ACK響應。

    異步刷盤:只要消息寫入PageCache便可將成功的ACK返回給Producer端。

  • Consumer端先從ConsumeQueue讀取持久化消息的offset,隨後再從CommitLog中進行讀取消息的真正實體內容。因此實際上讀取操做是隨機而不是順序的,因此這也是消費速度是比Kafka低的緣由。

    更加詳細的介紹請參考該鏈接的第五節

生產者

RocketMQ發送消息有三種方式

  • 同步

    消息發送後,等待服務端的ack響應,這種方式最可靠,但效率最低

  • 異步

    消息發送註冊回調函數,不需等待服務端的響應

  • 單向

    消息發送後,不關心服務端是否成功接受

若是Producer發送消息失敗,會自動重試,重試的策略:

  1. 重試次數 < retryTimesWhenSendFailed(可配置)
  2. 總的耗時(包含重試n次的耗時) < sendMsgTimeout(發送消息時傳入的參數)
  3. 同時知足上面兩個條件後,Producer會選擇另一個隊列發送消息

消費者

RocketMQ消費消息主要有兩種方式:

  • pull模式

    由消費者客戶端主動向服務端拉取消息。

    通常狀況下,若是咱們沒有控制好pull的頻率,頻率太低時,則可能消費速度過低致使消息的積壓,頻率太高時,則可能發送過多無效或低效pull請求,增長了服務端負載。

    爲了解決這個問題,RocketMQ在沒有足夠的消息時(如服務端沒有可消費的消息),並不會當即返回響應,而是保持並掛起當前請求,待有足夠的消息時在返回。而且咱們須要指定offset的起點和終點,而且須要咱們本身保存好本次消費的offset點,下次消費的時候好從上次的offset點開始拉取消息。

    pull模式咱們並不常用。

  • push模式

    由服務端主動地將消息推送給消費者。

    push模式下,慢消費的狀況可能致使消費者端的緩衝區溢出。

    可是在RocketMQ中並非真正的push,而是基於長輪訓的pull模式的來實現的僞push。具體的實現是:Consumer端每隔一段時間主動向broker發送拉消息請求,broker在收到Pull請求後,若是有消息就當即返回數據,Consumer端收到返回的消息後,再回調消費者設置的Listener方法。若是broker在收到Pull請求時,消息隊列裏沒有數據,broker端會阻塞請求直到有數據傳遞或超時才返回。

消費重試

即消費失敗後,隔一段時間從新消費該消息。

  1. 重試隊列

    RocketMQ會爲每一個消費組都設置一個Topic名稱爲%RETRY%+consumerGroup的重試隊列(這裏須要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每一個Topic設置的),用於暫時保存由於各類異常而致使Consumer端沒法消費的消息。Consumer端出現異常失敗時,失敗的消息會從新發送給服務端的重試隊列。

  2. 死信隊列

    重試隊列中超過配置的「最大重試消費次數」後就會移入到這個死信隊列中。在RocketMQ中,SubscriptionGroupConfig配置常量默認地設置了兩個參數,一個是retryQueueNums爲1(重試隊列數量爲1個),另一個是retryMaxTimes爲16(最大重試消費的次數爲16次)。Broker端經過校驗判斷,若是超過了最大重試消費次數則會將消息移至這裏所說的死信隊列。這裏,RocketMQ會爲每一個消費組都設置一個Topic命名爲%DLQ%+consumerGroup的死信隊列。

    通常在實際應用中,移入至死信隊列的消息,須要人工干預處理。

注意點:

  • RocketMQ的的默認延遲級別分爲16個,因此一條消息最大的重試次數爲16;

    // 源碼位置:org.apache.rocketmq.store.config.MessageStoreConfig.class
    // 如需修改,則須要修改broker的配置,官方並不建議修改
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    
    //若是咱們在發送消息時設置消息的延遲級別爲3,則表示消息10s後才能被消費者發現
    msg.setDelayTimeLevel(3);
  • RocketMQ的Message中的reconsumeTimes屬性,表示該消息當前已重試的的次數,咱們能夠經過以下方法來控制最大重試的次數,超過最大重試次數的消息將移入死信隊列中。

    // 設置最多重試3次, 默認16
    consumer.setMaxReconsumeTimes(3);

    RocketMQ自身的重試機制,默認消息的初始延遲級別就爲3,好像並無法修改(不敢確定)。源碼以下:

    // 源碼位置:org.apache.rocketmq.client.impl.consumer.ProcessQueue.class
    public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
        ...
        pushConsumer.sendMessageBack(msg, 3);
        ...
    }

簡單的代碼示例

Producer

public class Producer {

    public static void main(String[] args) {
        // 初始化一個生產者,生產組爲simple_producer_group
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("simple_producer_group_01");
        // 設置NameServer地址
        defaultMQProducer.setNamesrvAddr("192.168.195.88:9876");
        try {
            // 啓動
            defaultMQProducer.start();

            Producer producer = new Producer();
            producer.syncSend(defaultMQProducer);
            producer.asyncSend(defaultMQProducer);
            producer.onewaySend(defaultMQProducer);
        } catch (Exception e) {
            log.error("異常:", e);
        } finally {
            defaultMQProducer.shutdown();
        }
    }

    /**
     * 同步方式
     */
    private void syncSend(DefaultMQProducer producer) throws InterruptedException, RemotingException,MQClientException, MQBrokerException, UnsupportedEncodingException {
        for(int i=0; i<100; i++){
            String messageBody = "syncSend message" + i ;
            Message msg = new Message("testTopic1", "*", 
                                      messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
            /*
                設置消息延遲級別,默認有16個級別:
                1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                注意,若是設置爲3,意思是10s再被消費
             */
            //msg.setDelayTimeLevel(2);
            SendResult sendResult = producer.send(msg, 10000);
            log.info("onewaySend發送結果:{}", sendResult);
        }
    }

    /**
     * 異步方式
     */
    private void asyncSend(DefaultMQProducer producer) throws UnsupportedEncodingException, RemotingException, MQClientException, InterruptedException {
        for(int i=0; i<100; i++){
            String messageBody = "asyncSend message" + i ;
            Message msg = new Message("testTopic1", "*", 
                                      messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("asyncSend發送成功:{}", sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    log.info("asyncSend發送異常:{}", e);
                }
            });
        }
    }

    /**
     * 單向方式
     */
    private void onewaySend(DefaultMQProducer producer) throws RemotingException, MQClientException, InterruptedException, UnsupportedEncodingException {
        for(int i=0; i<100; i++){
            String messageBody = "onewaySend message" + i ;
            Message msg = new Message("testTopic1", "*", 
                                      messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.sendOneway(msg);
            log.info("onewaySend發送完成");
        }
    }
}

Consumer

public class Consumer {

    public static void main(String[] args) throws Exception {
        new Consumer().pushMode();
        //new Consumer().pullMode();

    }

    /**
     * push 消費模式
     */
    private void pushMode() throws MQClientException {
        // 設置消費組
        DefaultMQPushConsumer consumer 
            = new DefaultMQPushConsumer("simple_push_consumer_group_01");
        // 設置nameServer地址
        consumer.setNamesrvAddr("192.168.195.88:9876");
        // 設置從頭開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 指定topic
        consumer.subscribe("testTopic1", "*");
        // 設置批量消費消息的數量,默認1
        consumer.setConsumeMessageBatchMaxSize(1);
        // 設置最多重試3次, 默認16
        consumer.setMaxReconsumeTimes(3);
        // 註冊消息監聽
        consumer.registerMessageListener((List<MessageExt> msgs,
                                          ConsumeConcurrentlyContext context) -> {
            try {
                MessageExt msg = msgs.get(0);
               log.info("收到消息, body:{}, reconsumeTimes={}, delayTimeLevel={}",
                        new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET), 
                        msg.getReconsumeTimes(), msg.getDelayTimeLevel());
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                log.info("消費異常:", e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        // 運行
        consumer.start();
        log.info("消費者啓動成功");
    }


    /** 用於pull模式中記錄offset使用*/
    private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<>();

    /**
     * pull 消費模式, 不建議使用
     */
    private void pullMode() throws MQClientException {
        DefaultMQPullConsumer consumer 
            = new DefaultMQPullConsumer("simple_pull_consumer_group_01");
        consumer.setNamesrvAddr("192.168.195.88:9876");
        consumer.start();
        // 獲取topic中的全部隊列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("testTopic1");
        for (MessageQueue mq : mqs) {
            log.info("消費隊列:{}", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    log.info("pullResult:{}", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            pullResult.getMsgFoundList().forEach(msgExt -> {
                                try {
                                    log.info("收到消息:{}", new String(msgExt.getBody(), 
                                                                 RemotingHelper.DEFAULT_CHARSET));
                                } catch (UnsupportedEncodingException e) {
                                    log.info("異常:", e);
                                }
                            });
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case NO_MATCHED_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    log.info("異常:", e);
                }
            }
        }
    }

    /**
     * 獲取偏移量
     */
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSET_TABLE.get(mq);
        if (offset != null) {
            return offset;
        }
        return 0;
    }

    /**
     * 保存偏移量
     */
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSET_TABLE.put(mq, offset);
    }
}

分佈式事務解決方案

分佈式事務

邏輯是這樣的:

  1. 事務發起方發送事務消息到MQ中,但此時該消息對方不可見
  2. 消息發送成功後,執行本地事務
  3. 根據本地事務執行結果,向MQ發送確認消息commit 或rollback;若是爲rollbalke,MQ則刪除事務消息不在下發,如爲commit,則事務消息變爲對方可見
  4. 對方消費事務消息,若是消費失敗則進行重試,重試仍未成功則須要人工處理
  5. 若是確認消息發送失敗,或者執行本地事務時事務發起方宕掉,這時MQ 將會不停的詢問其同組的其餘producer來獲取狀態。

實例代碼(Springboot項目):

@Component
@Slf4j
public class TransactionProducer implements InitializingBean {
    @Autowired
    private TransactionListener transactionListener;

    private TransactionMQProducer producer;

    @Override
    public void afterPropertiesSet() throws Exception {
        producer = new TransactionMQProducer("transaction_producer_group_01");
        producer.setNamesrvAddr("192.168.195.88:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
                r -> {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
        		});

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        try {
            producer.start();
        } catch (MQClientException e) {
            log.error("TransactionProducer 啓動異常:", e);
        }

        // 添加關閉鉤子
        Runtime.getRuntime().addShutdownHook(new Thread(() -> producer.shutdown()));
    }

    /**
     * 發送事務消息
     */
    public void produce() {
        try {
            String body= "transaction_test_1";
            Message msg = new Message("transactionTopicTest1", "user",
                                      UUID.randomUUID().toString(),
                                      body.getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            log.info("發送事務消息結果:{}", sendResult);
        } catch (MQClientException | UnsupportedEncodingException e) {
            log.info("發送事務消息異常:{}", e);
        }
    }

}
@Component
@Slf4j
public class TransactionListenerImpl implements TransactionListener {
    @Resource(name = "jdbcTemplate1")
    private JdbcTemplate jdbcTemplate;

    /** 生產中能夠用redis或數據庫代替 */
    private ConcurrentHashMap<String, LocalTransactionState> localTrans 
        = new ConcurrentHashMap<>();

    /**
     * 執行本地事務
     * @param msg 事務消息
     * @param arg 自定義參數
     * @return 執行結果
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            jdbcTemplate.execute("insert into user(name) values('transaction_test_1')");
            localTrans.put(msg.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
            log.info("本地事務執行成功");
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e){
            localTrans.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
            log.error("本地事務執行失敗");
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        LocalTransactionState state = localTrans.get(msg.getTransactionId());
        log.info("執行事務回查,transactionId:{}, transactionState:{}", 
                 msg.getTransactionId(), state);
        if (null != state) {
            return state;
        }
        return LocalTransactionState.UNKNOW;
    }
}
@Component
@Slf4j
public class TransactionConsumer implements InitializingBean {
    @Resource(name = "jdbcTemplate2")
    private JdbcTemplate jdbcTemplate;

    @Override
    public void afterPropertiesSet() throws Exception {
        DefaultMQPushConsumer consumer 
            = new DefaultMQPushConsumer("transaction_consumer_group_01");
        consumer.setNamesrvAddr("192.168.195.88:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("transactionTopicTest1", "user");
		// 只重試三次
        consumer.setMaxReconsumeTimes(3);
        consumer.registerMessageListener((List<MessageExt> msgs, 
                                          ConsumeConcurrentlyContext context) -> {
            try {
                MessageExt msg = msgs.get(0);
                log.info("收到消息:{}", msg);
                // 正式項目中,咱們應該保存該msgId,防止消息的重複消費
                // String msgId = msg.getMsgId();
                String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                jdbcTemplate.execute("insert into user(name) values('"+body+"')");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                log.info("異常:", e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
        log.info("TransactionConsumer啓動成功");
    }
}
/**
 * 數據源配置,用的Mysql和Druid
 */
@Configuration
public class DbConfig {

    @Bean(name = "dataSource1")
    DataSource dataSource1() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl("jdbc:mysql://192.168.195.88:3306/jta1?useUnicode=true&characterEncoding=UTF8&useSSL=false");
        dataSource.setUsername("root");
        dataSource.setPassword("root");
        return dataSource;
    }

    @Bean(name = "dataSource2")
    DataSource dataSource2() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl("jdbc:mysql://192.168.195.88:3306/jta2?useUnicode=true&characterEncoding=UTF8&useSSL=false");
        dataSource.setUsername("root");
        dataSource.setPassword("root");
        return dataSource;
    }

    @Bean("jdbcTemplate1")
    JdbcTemplate first(@Qualifier("dataSource1") DataSource dataSource) {
        JdbcTemplate jdbcTemplate = new JdbcTemplate();
        jdbcTemplate.setDataSource(dataSource);
        return jdbcTemplate;
    }

    @Bean("jdbcTemplate2")
    JdbcTemplate second(@Qualifier("dataSource2") DataSource dataSource) {
        JdbcTemplate jdbcTemplate = new JdbcTemplate();
        jdbcTemplate.setDataSource(dataSource);
        return jdbcTemplate;
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
    @Autowired
    TransactionProducer transactionProducer;

    /**
     * 測試分佈式事務
     */
    @Test
    public void testTransactionProducer() throws InterruptedException {
        transactionProducer.produce();
        Thread.sleep(10000000);
    }
}

其餘的一些東西

如下內容沒有主要研究,主要摘自官網,有須要時再詳細研究。

有序消息

原理和Kafka要作到有序差很少,即把消息放入一個隊列中,而後使用一個消費者去進行消費。

廣播消息

即每一個消費者消費都全部的消息

//設置廣播模式屬性便可
consumer.setMessageModel(MessageModel.BROADCASTING);

批量發送消息

批量發送消息可提升生產者的性能。注意的是:同一批次的消息應該具備:相同的topic,相同的waitStoreMsgOK和沒有延時計劃,而且一批消息的總大小不該超過1M。

消息過濾器

RocketMQ在消息過濾上是比較強大的,雖然咱們可能不會常常用到它。Message除了能夠Tag來加以區分外,咱們還能夠爲它添加額外的屬性。以下:

在發送消息時:

// 消息添加額外屬性
msg.putUserProperty("a", "3");

在消費消息時:

// 只有消費具備屬性a,a>= 0且a <= 3的消息
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

還有很是多的過濾方式如:in and or not is null = < > 等等

消息重複

RocketMQ不保證消息不重複,若是你的業務須要保證嚴格的不重複消息,須要你本身在業務端去重。

相關文章
相關標籤/搜索