RocketMQ Consumer接收消息流程

 這節介紹Consumer接收消息的流程,分爲Pull和Push模式。segmentfault

1. 初始化

 上一節講Rebalance時提到,Consumer接受客戶端有兩種方式:緩存

  1. Broker發現客戶端列表有變化,通知全部Consumer執行Rebalance
  2. Consumer定時每20秒自動執行Rebalance

其中1.的通知到達Consumer後,會當即觸發Rebalance,而後會重置2.的定時器等待時間。兩者最後通知Consumer的方式爲微信

  1. Push模式:當有新的Queue分配給客戶端時,會新包裝一個PullRequest,用於後續自動拉取消息,具體到DefaultMQPushConsumerImpl的executePullRequestImmediately方法
  2. Pull模式:回調DefaultMQPullConsumerImpl的MessageQueueListener有Queue發生改變

2. Push模式

 executePullRequestImmediately的內容爲:app

public void executePullRequestImmediately(final PullRequest pullRequest) {
    this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}

即將PullRequest對象傳給了PullMessageService的executePullRequestImmediately方法:框架

public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}

PullMessageService的結構以下:異步

file

內部維護着一個LinkedBlockingQueue屬性pullRequestQueue,用於存儲待處理的PullRequest;還有一個ScheduledExecutorService,用於延期處理PullRequest。具體流程以下:ide

file

  1. RebalanceImpl調用DefaultMQPushConsumerImpl的executePullRequestImmediately方法,傳入PullRequest
  2. DefaultMQPushConsumerImpl內部調用PullMessageService的executePullRequestImmediately方法,該方法會把傳入的PullRequest對象放到LinkedBlockingQueue中進行存儲,等待後續處理。
  3. PullMessageService會循環從隊列中出隊一個PullRequest,並調用自身的pullMessage用於後續處理。該動做會從MQClientInstance中選擇對應的客戶端實例DefaultMQPushConsumerImpl,並委託給它的pullMessage方法。
  4. DefaultMQPushConsumerImpl會先判斷當前請求是否知足條件,若是不知足條件,會調用PullMessage的executePullRequestLater方法,將當前請求延後處理。若是知足條件,會封裝一個PullCallback對象用於處理異步消息,並調用PullAPIWrapper異步請求Broker拉取消息。

從上面的過程能夠看出,Push模式內部仍是客戶端主動去拉取的,即所謂的封裝拉模式以實現推模式,簡單示意圖以下:fetch

file

內部經過PullMessageService循環的從PullRequest對應MessageQueue中主動拉取數據。this

2.1. DefaultMQPushConsumerImpl.pullMessage(PullRequest)

 該方法用於完成從MessageQueue拉取消息的過程,主要過程以下:spa

file

  1. 判斷該MessageQueue對應的PullRequest是否已經標記爲drop,若是是則直接返回
  2. 進行一系列的檢查,若是檢查不經過,則等待必定時間後再放回PullMessageService的待處理隊列中,主要是經過PullMessageService中的ScheduledExecutorService來作到延遲執行,涉及的狀況包括:

    1. 若是客戶端未準備就緒(DefaultMQPushCOnsumerImpl執行start後status爲RUNNING),則延遲PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)後再放回PullMessage的隊列中
    2. 若是是暫停狀態,則延遲PULL_TIME_DELAY_MILLS_WHEN_SUSPEND(1000)後再放回PullMessageService的等待隊列中
    3. 若是緩存的消息數大於配置的拉取線程數閾值(默認1000),則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)後再返回等待隊列中處理
    4. 若是緩存的消息大小大於配置的拉取大小閾值(默認100M),則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)後再返回等待隊列中處理
    5. 緩存的數據offset相差的偏移量超過設定值(默認2000),則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)後再返回等待隊列中處理
    6. 若是沒有訂閱MessageQueue對應的topic,則等待PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)後再返回隊列中處理
  3. 包裝PullCallback對象,並調用PullAPIWrapper發起異步請求拉取消息

上面經過PullAPIWrapper收到結果後會將結果包裝爲PullResult對象並回調PullCallback。PullCallback和PullResult的定義以下:

public interface PullCallback {
    void onSuccess(final PullResult pullResult);
    
    void onException(final Throwable e);
}
public class PullResult {
    private final PullStatus pullStatus;//請求狀態
    private final long nextBeginOffset;//Broker返回的下一次開始消費的offset
    private final long minOffset;
    private final long maxOffset;
    private List<MessageExt> msgFoundList;//消息列表,一次請求返回一批消息
}

下面爲pullMessage方法處理異步返回結果的流程:

file

  1. 若是請求失敗,則等待PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)後再放回PullMessageService的待處理隊列中;處理成功則進入2.
  2. 調用PullAPIWrapper對結果進行預處理
  3. 根據請求狀態進行處理

    1. 有新消息(FOUND)

      1. 設置PullRequest下次開始消費的起始位置爲PullResult的nextBeginOffset
      2. 若是結果列表爲空則不延遲,立馬放到PullMessageService的待處理隊列中,不然進入3
      3. 將PullResult中的結果List<MessageExt>放入ProcessQueue的緩存中,並通知ConsumeMessageService處理
      4. 將該PullRequest放回待處理隊列中等待再次處理,若是有設置拉取的間隔時間,則等待該時間後再翻到隊列中等待處理,不然直接放到隊列中等待處理
    2. 沒有新消息(NO_NEW_MSG)

      1. 設置PullRequest下次開始消費的起始位置爲PullResult的nextBeginOffset
      2. 若是緩存的待消費消息數爲0,則更新offset存儲
      3. 將PullRequest立馬放到PullMessageService的待處理隊列中
    3. 沒有匹配的消息(NO_MATCHED_MSG)

      1. 設置PullRequest下次開始消費的起始位置爲PullResult的nextBeginOffset
      2. 若是緩存的待消費消息數爲0,則更新offset存儲
      3. 將PullRequest立馬放到PullMessageService的待處理隊列中
    4. 不合法的偏移量(OFFSET_ILLEGAL)

      1. 設置PullRequest下次開始消費的起始位置爲PullResult的nextBeginOffset
      2. 標記該PullRequset爲drop
      3. 10s後再更新並持久化消費offset;再通知Rebalance移除該MessageQueue

 下面先介紹下ProcessQueue,這裏只標識幾個相關的屬性:

public class ProcessQueue {
    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
    //緩存的待消費消息,按照消息的起始offset排序
    private final TreeMap</*消息的起始offset*/Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
    //緩存的待消費消息數量
    private final AtomicLong msgCount = new AtomicLong();
    //緩存的待消費消息大小
    private final AtomicLong msgSize = new AtomicLong();
    private final Lock lockConsume = new ReentrantLock();
    /**
     * A subset of msgTreeMap, will only be used when orderly consume
     */
    private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
    private final AtomicLong tryUnlockTimes = new AtomicLong(0);
    private volatile long queueOffsetMax = 0L;
    private volatile boolean dropped = false;
    //最近執行pull的時間
    private volatile long lastPullTimestamp = System.currentTimeMillis();
    //最近被客戶端消費的時間
    private volatile long lastConsumeTimestamp = System.currentTimeMillis();
    private volatile boolean locked = false;
    private volatile long lastLockTimestamp = System.currentTimeMillis();
    //當前是否在消費,用於順序消費模式,對並行消費無效
    private volatile boolean consuming = false;
    private volatile long msgAccCnt = 0;
}

ProcessQueue展現了MessageQueue的消費狀況。上面提到,發起pull請求後若是有數據,會先放到ProcessQueue的緩存中,即msgTreeMap屬性,於是緩存的消息會按照消息的起始offset被排序存儲。經過ProcessQueue能夠查看MessageQueue當前的處理狀況,ProcessQueue還用於輔助實現順序消費。

2.2 ConsumeMessageService

 異步返回的消息內容將交給ConsumeMessageService處理,ConsumeMessageService是個接口,方法定義以下:

public interface ConsumeMessageService {
    void start();

    void shutdown();

    void updateCorePoolSize(int corePoolSize);

    void incCorePoolSize();

    void decCorePoolSize();

    int getCorePoolSize();

    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);

    void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume);
}

經過定義可見,要求實現類提供異步處理的功能。內部提供的實現類有:

file

ConsumeMessageConcurrentlyService:並行消費;ConsumeMessageOrderlyService:順序消費,這裏重點看ConsumeMessageConcurrentlyService。異步請求後會將拉取的新消息列表交給submitConsumeRequest方法處理,以下:

file

該方法會將傳入的消息列表分裝爲一個ConsumeRequest,並提到到線程池中等待處理。若是傳入的消息列表長度超過設定值(默認爲1),則會分多個批處理。

 在介紹消費具體過程以前先回顧客戶端啓動流程的Demo,接收消息的寫法以下:

public class Consumer {

    public static void main (String[] args) throws InterruptedException, MQClientException {

        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest");

        // 設置NameServer的地址
        consumer.setNamesrvAddr ("localhost:9876");

        // 訂閱一個或者多個Topic,以及Tag來過濾須要消費的消息
        consumer.subscribe ("TopicTest", "*");
        // 註冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener (new MessageListenerConcurrently () {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs);
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啓動消費者實例
        consumer.start ();
        System.out.printf ("Consumer Started.%n");
    }
}

其中註冊了一個MessageListenerConcurrently,該類將用於用戶端處理消息。

 回過來看ConsumeRequest,該類實現了Runnable接口,會在run方法完成主要的處理工做,主要動做爲:

  1. 調用DefaultMQPushConsumerImpl.executeHookBefore執行前置hook動做
  2. 調用MessageListenerConcurrently.consumeMessage通知用戶端處理消息,即上面demo內容,會返回處理結果ConsumeConcurrentlyStatus
  3. 調用DefaultMQPushConsumerImpl.executeHookAfter執行後置hook動做
  4. ConsumeMessageConcurrentlyService.processConsumeResult根據ConsumeConcurrentlyStatus執行收尾動做

2.2.1. MessageListenerConcurrently.consumeMessage

 用戶真正接收消息並執行處理動做的地方,須要返回ConsumeConcurrentlyStatus告知框架處理結果。這裏在方法裏最好不要作耗時長的任務,快速處理後返回給框架結果,避免消息堆積在線程池中。能夠將消息內容複製一遍後再放到線程池中進行分發處理。

2.2.2. ConsumeMessageConcurrentlyService.processConsumeResult

 該方法主要在用戶消費完數據後進行收尾動做,過程以下:

file

ConsumerRequest在run方法的開始處,實例化了一個ConsumeConcurrentlyContext對象,用於後續傳遞內容,該定義爲:

public class ConsumeConcurrentlyContext {
    private final MessageQueue messageQueue;
    //重試的延遲級別,-1:不重試;0:由broker控制;>0由客戶端控制
    private int delayLevelWhenNextConsume = 0;
    //消息列表最後一個正常消費的消息索引號
    private int ackIndex = Integer.MAX_VALUE;
}

其中ackIndex表示最後一個正常消費的消息索引號(0從開始,0~ackIndex爲正常消費),該位置後的消息表示無法正常消費。該值由用戶端控制,能夠經過ackIndex來控制須要重發的消息。

 ackIndex默認值爲Integer.MAX_VALUE,若是爲該值則認爲全部消息正常消費,不存在錯誤。上面流程中統計成功和失敗也是根據ackIndex來判斷的,對於ackIndex後的消息,若是是集羣消費模式,則會先嚐試發送回broker,由broker控制重試時機;若是重試失敗,會收集這些失敗的消息,延遲5秒後再調用一次ConsumeMessageService.submitConsumeRequest讓用戶端再次處理。最後會將處理成功的消息從ProcessQueue中移除,更新緩存,而後將q消費的偏移量記錄下來,等待後臺線程同步到broker或者本地。

 綜合上面的介紹,Push模式下的處理流程大體以下:

file

Push模式經過PullMessageService循環從監聽的MessageQueue中以Pull模式拉取消息,並分發給用戶註冊的MesageListenerConsurrently對象處理,完了以後會自動處理消息的重試,offset更新等動做,從而模擬消息從Broker端主動推進過來。

2. Pull模式

 同Push模式同樣,Pull模式的觸發也是經過Rebalance,以下:

file

同開頭說起的同樣,會回調DefaultMQPullConsumerImpl的MessageQueueListener有Queue發生改變。

 系統提供了MQPullConsumerScheduleService,能夠定時以Pull模式拉取消息,並將結果通知MessageQueueListener,內部的實現爲:

class MessageQueueListenerImpl implements MessageQueueListener {
    @Override
    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {//mqAll該topic下的全部q,mqDivided該實例分配到的q
        MessageModel messageModel =
            MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
        switch (messageModel) {
            case BROADCASTING:
                MQPullConsumerScheduleService.this.putTask(topic, mqAll);//通知該topic下的監聽器,最新的全部q
                break;
            case CLUSTERING:
                MQPullConsumerScheduleService.this.putTask(topic, mqDivided);//通知該topic下的監聽器,該實例分配的q
                break;
            default:
                break;
        }
    }
}

putTask會將分配到的新的MessageQueue包裝爲一個PullTaskImpl,PullTaskImpl實現了Runnable,會在後臺一直執行;而將不屬於本身處理的MessageQueue對應的PullTaskImpl停掉。PullTaskImpl會查找該MessageQueue所監聽topic對應的處理類PullTaskCallback,調用doPullTask,將具體動做讓用戶處理。

 MQPullConsumerScheduleService的例子爲:

public class PullScheduleService {

    public static void main(String[] args) throws MQClientException {
        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");

        scheduleService.setMessageModel(MessageModel.CLUSTERING);
        scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {//註冊topic的監聽器

            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer consumer = context.getPullConsumer();
                try {

                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;

                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                    System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());//上報消費的offset,消費完後要主動上報

                    context.setPullNextDelayTimeMillis(100);//設置下一次觸發間隔
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        scheduleService.start();
    }
}

 也能夠本身手動執行pull,以下面的例子:

public class PullConsumer {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");

        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}

 相較於Push模式,Pull模式則須要用戶本身控制消息的重試,offset更新等動做。下面附上該部分當時源碼閱讀過程作的筆記簡圖:

file

更多原創內容請搜索微信公衆號:啊駝(doubaotaizi)
相關文章
相關標籤/搜索