RocketMQ系列之push(推)消息模式(五)

Eureka精品源碼
web

Quartz系列全集
算法

xxl-job系列apache

sharding-jdbc精品源碼合集
緩存

構建sleuth+zipkin全鏈路監控系統(完結篇)
微信


前言

本文詳細介紹了rocketMQ中的推模型,看完這片文章,你會對push模型有很是很是深刻的理解。架構

推消息的經常使用示例app

 1public class Consumer {
2
3    public static void main(String[] args) throws InterruptedException, MQClientException {
4
5        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
6
7        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
8
9        consumer.subscribe("TopicTest""*");
10        consumer.registerMessageListener(new MessageListenerConcurrently() {
11
12            @Override
13            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
14                ConsumeConcurrentlyContext context)
 
{
15                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
16                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
17            }
18        });
19
20        consumer.start();
21
22        System.out.printf("Consumer Started.%n");
23    }
24}

推模式和拉模式在使用上有很大的不一樣,其中實現機制最大的不一樣在於DefaultMQPushConsumer負載均衡

咱們首先來看一下rocketMq推拉模式實現的主要類圖異步

rokcetMq消費者消息模式類圖

其實MQConsumer上面還有一個頂級類MQAdmin,沒有畫出來。ide

首先咱們看一下MQConsumer的代碼實現

 1/**
2 * Message queue consumer interface
3 */

4public interface MQConsumer extends MQAdmin {
5    /**
6     * If consuming failure,message will be send back to the brokers,and delay consuming some time
7     */

8    @Deprecated
9    void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
10        MQBrokerException, InterruptedException, MQClientException
;
11
12    /**
13     * 若是消費失敗,消息會被從新發送到borker, 而且延遲消費。
14     */

15    void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
16        throws RemotingException, MQBrokerException, InterruptedException, MQClientException
;
17
18    /**
19     * 獲取消息隊列從訂閱的topic上
20     *
21     * @param topic message topic
22     * @return queue set
23     */

24    Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;
25}

PullMessageService

介紹拉模式的消息,那麼必需要講一下PullMessageService , 該類是一個線程類,繼承了ServiceThread這個對象,其實就是一個線程。

下面看一下ServiceThread 的大體實現,隱藏了一些暫時不須要了解的代碼,否則代碼太長了

 1public abstract class ServiceThread implements Runnable {
2        private Thread thread;
3    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
4    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
5    protected volatile boolean stopped = false;
6    protected boolean isDaemon = false;
7
8    //Make it able to restart the thread
9    private final AtomicBoolean started = new AtomicBoolean(false);
10
11    public ServiceThread() {
12
13    }
14
15    public abstract String getServiceName();
16
17    public void start() {
18        log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);    
19          // 更新啓動狀態,看是否重複啓動
20        if (!started.compareAndSet(falsetrue)) {
21            return;
22        }
23
24          // 默認是不中止的
25        stopped = false;
26          // 初始化一個線程,runnable就是當前的類
27        this.thread = new Thread(this, getServiceName());
28          // 是不是守護線程, 默認不是守護線程,JVM關閉的時候,須要等當前線程執行完畢
29        this.thread.setDaemon(isDaemon);
30          // 啓動啦。。。
31        this.thread.start();
32    }
33}

看一下PullMessageService 的大體實現

 1public class PullMessageService extends ServiceThread {
2    // 省略....
3  @Override
4    public void run() {
5        log.info(this.getServiceName() + " service started");
6
7        while (!this.isStopped()) {
8            try {
9                  // 從隊列裏面獲取請求信息
10                PullRequest pullRequest = this.pullRequestQueue.take();
11               // 獲取消息
12                this.pullMessage(pullRequest);
13            } catch (InterruptedException ignored) {
14            } catch (Exception e) {
15                log.error("Pull Message Service Run Method exception", e);
16            }
17        }
18
19        log.info(this.getServiceName() + " service end");
20    }
21  // 省略....
22}

大體的代碼層次就是下面這樣子的

------Runnable

----------ServiceThread

--------------PullMessageService

下面看一下MQClientInstance初始化建立的時候的代碼,刪除了一些不屬於本章節的內容。

 1public void start() throws MQClientException {
2        synchronized (this) {
3            switch (this.serviceState) {
4                case CREATE_JUST:
5                            // 省略源碼------
6                    // 開啓pullService , 針對consumer
7                    this.pullMessageService.start();
8                           // 省略源碼------
9                    break;
10                case RUNNING:
11                    break;
12                case SHUTDOWN_ALREADY:
13                    break;
14                case START_FAILED:
15                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed."null);
16                default:
17                    break;
18            }
19        }
20    }

在MQClientInstance啓動的時候,會調用pullMessageService的start的方法,其實就是調用到了ServiceThread的start方法。

好了,上面咱們已經清楚了什麼是PullMessageService, 在這裏作一下總結。

PullMessageService 就是一個後臺線程,消費者啓動這個線程用來獲取去borker獲取消息的。

這個類裏面有個很重要的東西,就是一個阻塞隊列

1public class PullMessageService extends ServiceThread {
2    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
3

這個隊列很是重要,該隊列裏面存儲的是有去獲取消息的請求, 這個時候,就有人連環三問了

什麼時間

什麼地點

什麼人

會往pullRequestQueue 裏面放元素呢?

OK,帶着這個問題,筆者調試了一波代碼,一頓操做猛如虎,總算搞了個頭緒出來。

  1. 程序啓動的時候, 首先往這個隊列裏面放元素的是負載均衡器,org.apache.rocketmq.client.impl.consumer.RebalanceImpl

,負載均衡器會根據負載均衡算法計算出當前消費者須要消費的隊列, 每個隊列構建一個pullRequest , 而後放入隊列

  1. 每次請求消息以後,會把pullRequest 從新放入隊列, 以供接下來繼續請求

  2. 程序在運行過程當中,負載均衡器檢測到有新增長的隊列,那麼會把新增長的隊列做爲一個pullRequest 放入隊列

提問:pullRequest何時會被移除?

答:pullRequest裏面有一個很重要的概念ProcessQueue , 這個是請求處理隊列,該隊列裏面會維護好一個 dropped 變量,當變成false的時候,這個pullRequest從對立裏面取出來以後,就不會再放回去了,至關於被移除了。 通常在Rebalance 的時候進行移除換句話說,其實就是當消費者數量發生變化或者topic隊列發生變化的時候,經過負載均衡器從新進行負載的時候,若是這個queue再也不被本Consumer消費,那麼其所對應的pullRequest會被移除

 1public class PullRequest {
2      // 當前消費組
3        private String consumerGroup;
4      // 消息隊列信息
5        private MessageQueue messageQueue;
6      // 接下來要消費的offSet進度
7    private long nextOffset;
8      // 當前隊列信息
9    private ProcessQueue processQueue;
10    private boolean lockedFirst = false;
11}

那麼接下來看一下push模式的消息拉取是什麼樣子的。

消息拉取

上一個段落裏面,pullMessageService裏面從隊列獲取了一個pullRequest , 而後調用了pullMessage 方法,下面看一下這個方法

org.apache.rocketmq.client.impl.consumer.PullMessageService

 1private void pullMessage(final PullRequest pullRequest) {
2              // 從MQClientFactory裏面獲取消費者信息
3        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
4        if (consumer != null) {
5              // 這裏強轉給PushConsumer, 若是不是的話,那麼確定就報錯了,因此這裏作了一個限制
6            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
7              // 走一把
8            impl.pullMessage(pullRequest);
9        } else {
10            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
11        }
12    }

步驟說明:

  1. 從MQClientFactory裏面獲取消費者信息, 這個在DefaultMQPushConsumer 初始化的時候,註冊進去的。當時註冊的是DefaultMQPushConsumerImpl

  2. 強轉爲DefaultMQPushConsumerImpl, 若是不是的話,那麼確定就報錯了,因此這裏作了一個限制

  3. 調用DefaultMQPushConsumerImplpullMessage方法

pullMessage

源碼入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl

 1public void pullMessage(final PullRequest pullRequest) {
2        // 從pullRequest中獲取處理隊列,每一個RocketMq的queue被消費者消費的時候,都會指定一個pullRequest ,
3        // 那麼這個queue拉取到的緩存消息,消息統計等信息,都在`ProcessQueue`裏面被包含着。
4        final ProcessQueue processQueue = pullRequest.getProcessQueue();
5        // 判斷當前pullRequest是否被丟棄
6        if (processQueue.isDropped()) {
7            log.info("the pull request[{}] is dropped.", pullRequest.toString());
8            return;
9        }
10        // 設置pullRequest處理隊列最後一次的處理時間
11        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
12
13        try {
14            //   檢查服務的狀態是否OK, 也就是是否在Running狀態
15            this.makeSureStateOK();
16        } catch (MQClientException e) {
17            log.warn("pullMessage exception, consumer state not ok", e);
18            // 將pullRequest放入隊列,只不過是經過後臺的定時線程池延遲放入,防止一直請求
19            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
20            return;
21        }
22        // 消費者是否被暫停,若是被暫停的話,則暫時返回
23        if (this.isPause()) {
24            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}"this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
25            // 將pullRequest放入隊列,只不過是經過後臺的定時線程池延遲放入,防止一直請求
26            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
27            return;
28        }
29        // 當前pullRequest總共緩存了多少消息
30        long cachedMessageCount = processQueue.getMsgCount().get();
31        // 計算緩存消息的大小, 以MB爲單位
32        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
33        // pullThresholdForQueue = 1000  , 緩存在消費端的消息最多爲1000條,若是大於,那麼此次就放棄拉取,等待下次
34        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
35            // 將pullRequest放入隊列,只不過是經過後臺的定時線程池延遲放入,防止一直請求
36            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
37            //緩存消息的數量超過了閾值, 流量控制也是, 這裏的意思其實就是說,
38            // 這個消費者不是一次兩次緩存消息超過數量了,而是超過了1000以上了,就是消費者消費的太慢了,大家發的太快了,打印一把日誌
39            if ((queueFlowControlTimes++ % 1000) == 0) {
40                log.warn(
41                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
42                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
43            }
44            return;
45        }
46        // 緩存消息大小是否超過100MB,pullThresholdSizeForQueue = 100 ,
47        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
48            // 將pullRequest放入隊列,只不過是經過後臺的定時線程池延遲放入,防止一直請求
49            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
50            //緩存消息的數量超過了閾值, 流量控制也是, 這裏的意思其實就是說,
51            // 這個消費者不是一次兩次緩存消息大小超過100MB了,而是超過了1000以上了,就是消費者消費的太慢了,大家發的太快了,消息太大了,打印一把日誌
52            if ((queueFlowControlTimes++ % 1000) == 0) {
53                log.warn(
54                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
55                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
56            }
57            return;
58        }
59        // 消費端是不是排序消費
60        if (!this.consumeOrderly) {
61            // 判斷消息是否跨過了最大的消息跨度 , 經過offset的大小, 緩存的消息 offset的最大跨度不能夠超過 consumeConcurrentlyMaxSpan = 2000
62            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
63                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
64                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
65                    log.warn(
66                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
67                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
68                        pullRequest, queueMaxSpanFlowControlTimes);
69                }
70                return;
71            }
72        } else {
73            // 若是是排序消費,那麼pullRequest就是須要上鎖了,只容許一我的同時操做隊列,防止亂序
74            if (processQueue.isLocked()) {
75                if (!pullRequest.isLockedFirst()) {
76                    // 計算順序消費的話,從哪裏開始消費爲準, 由於順序消費的話在下面的
77                     // PullCallback onSuccess方法裏面,存在一個dispatchToConsume參數
78                    // 若是dispatchToConsume = false的話,那麼消息就不會被丟到線程池被後臺線程消費
79                    // ,僅在PullRequest的處理隊列裏面
80                    final long offset = this.rebalanceImpl.computePullFromWhere
81                    (pullRequest.getMessageQueue());
82                    boolean brokerBusy = offset < pullRequest.getNextOffset();
83                    log.info("the first time to pull message, so fix offset from broker. 
84                             pullRequest: {} NewOffset: {} brokerBusy: {}"
,
85                        pullRequest, offset, brokerBusy);
86                    if (brokerBusy) {
87                        log.info("[NOTIFYME]the first time to pull message, but pull request
88                        offset larger than broker consume offset. pullRequest: {} NewOffset: {}"
,
89                            pullRequest, offset);
90                    }
91                    // 設置下次拉取的offSet
92                    pullRequest.setLockedFirst(true);
93                    pullRequest.setNextOffset(offset);
94                }
95            } else {
96                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
97                log.info("pull message later because not locked in broker, {}", pullRequest);
98                return;
99            }
100        }
101
102        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().
103          get(pullRequest.getMessageQueue().getTopic());
104        if (null == subscriptionData) {
105            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
106            log.warn("find the consumer's subscription failed, {}", pullRequest);
107            return;
108        }
109
110        final long beginTimestamp = System.currentTimeMillis();
111                // 消息結果回調
112        PullCallback pullCallback = new PullCallback() {
113            @Override
114            public void onSuccess(PullResult pullResult) {
115                if (pullResult != null) {
116                      // 獲取pullResult
117                    pullResult = DefaultMQPushConsumerImpl.this.
118                      pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
119                        subscriptionData);
120                                        // 消息結果狀態
121                    switch (pullResult.getPullStatus()) {
122                        case FOUND: // 找到消息了
123                            long prevRequestOffset = pullRequest.getNextOffset();
124                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
125                            long pullRT = System.currentTimeMillis() - beginTimestamp;
126                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().
127                              incPullRT(pullRequest.getConsumerGroup(),
128                                pullRequest.getMessageQueue().getTopic(), pullRT);
129
130                            long firstMsgOffset = Long.MAX_VALUE;
131                            // 判斷消息結果是否爲空,
132                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFou
133                            ndList().isEmpty()
{
134                                // 爲空就等待下次繼續拉取
135                                            DefaultMQPushConsumerImpl.this.
136                                                                    executePullRequestImmediately(pullRequest);
137                            } else {
138                                // 不爲空,獲取第一條消息的firstMsgOffset
139                                firstMsgOffset = pullResult.getMsgFoundList().get(0).
140                                getQueueOffset();
141
142                                // 這裏貌似是添加消息的統計信息, TPS之類的
143                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().
144                                  incPullTPS(pullRequest.getConsumerGroup(), pullRequest.
145                                    getMessageQueue().getTopic(), pullResult.getMsgFoundList()
146                                    .size());
147                                // 先將本次拉取到的消息放入到pullRequest處理隊列裏面去
148                                boolean dispatchToConsume = processQueue.putMessage(pullResult.
149                                getMsgFoundList());
150
151                                // 來來來,哥幾個,這裏就是重點了,將消息放入後臺消費者的線程池並行消費。
152                                // dispatchToConsume 這個參數僅針對順序消費的場景,並行消費的場景不用這個參數
153                                DefaultMQPushConsumerImpl.this.consumeMessageService.
154                                submitConsumeRequest(
155                                    pullResult.getMsgFoundList(),
156                                    processQueue,
157                                    pullRequest.getMessageQueue(),
158                                    dispatchToConsume);
159
160                                // 下次獲取消息的時間間隔,pullInterval = 0 ,默認值
161                                if (DefaultMQPushConsumerImpl.this.
162                                defaultMQPushConsumer.getPullInterval() > 0) {
163                                    // 若是大於0 ,則延遲獲取
164                                    DefaultMQPushConsumerImpl.this.
165                                    executePullRequestLater(pullRequest,
166                                        DefaultMQPushConsumerImpl.this.
167                                        defaultMQPushConsumer.getPullInterval());
168                                } else {
169                                    // 直接塞到阻塞隊列裏面去,幾乎立刻就會繼續請求
170                                    DefaultMQPushConsumerImpl.this.
171                                    executePullRequestImmediately(pullRequest);
172                                }
173                            }
174
175                            if (pullResult.getNextBeginOffset() < prevRequestOffset
176                                || firstMsgOffset < prevRequestOffset) {
177                                log.warn(
178                                    "[BUG] pull message result maybe 
179                                    data wrong, nextBeginOffset: {} 
180                                    firstMsgOffset: {} prevRequestOffset: {}"
,
181                                    pullResult.getNextBeginOffset(),
182                                    firstMsgOffset,
183                                    prevRequestOffset);
184                            }
185
186                            break;
187                        case NO_NEW_MSG: // 沒有新消息
188                            // 沒有新消息,將pullRequest塞到阻塞隊列裏面去,幾乎立刻就會繼續請求
189                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
190
191                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
192
193                            DefaultMQPushConsumerImpl.this.
194                            executePullRequestImmediately(pullRequest);
195                            break;
196                        case NO_MATCHED_MSG: // 沒有找到匹配的消息
197                            // 沒有新消息,將pullRequest塞到阻塞隊列裏面去,幾乎立刻就會繼續請求
198                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
199
200                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
201
202                            DefaultMQPushConsumerImpl.this.
203                            executePullRequestImmediately(pullRequest);
204                            break;
205                        case OFFSET_ILLEGAL:
206                            // offset錯誤,
207                            log.warn("the pull request offset illegal, {} {}",
208                                pullRequest.toString(), pullResult.toString());
209                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
210                            // pullRequest設置爲丟棄,後面不獲取消息了
211                            pullRequest.getProcessQueue().setDropped(true);
212                            // 延遲執行,10秒鐘
213                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
214
215                                @Override
216                                public void run() {
217                                    try {
218                                        DefaultMQPushConsumerImpl.this.offsetStore.
219                                        updateOffset(pullRequest.getMessageQueue(),
220                                            pullRequest.getNextOffset(), false);
221
222                                        DefaultMQPushConsumerImpl.this.offsetStore.
223                                        persist(pullRequest.getMessageQueue());
224                                        // 移除
225                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.
226                                        removeProcessQueue(pullRequest.getMessageQueue());
227
228                                        log.warn("fix the pull request offset, {}", pullRequest);
229                                    } catch (Throwable e) {
230                                        log.error("executeTaskLater Exception", e);
231                                    }
232                                }
233                            }, 10000);
234                            break;
235                        default:
236                            break;
237                    }
238                }
239            }
240
241            @Override
242            public void onException(Throwable e) {
243                // 若是出現異常的話,走這個地方
244                if (!pullRequest.getMessageQueue().getTopic().
245                startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
246                    log.warn("execute the pull request exception", e);
247                }
248                // 將消息請求再次放入隊列,只不過放入的是須要延遲一會發送的
249                DefaultMQPushConsumerImpl.this.
250                executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
251            }
252        };
253        // 獲取已提交offSet值
254        boolean commitOffsetEnable = false;
255        long commitOffsetValue = 0L;
256        // 判斷是否爲集羣消費,由於廣播消費的offSet不提交到broker上面去, 這個是爲了去borker上面獲取消息用的
257        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
258            // 從消息存儲器裏面獲取
259            commitOffsetValue = this.offsetStore.
260            readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
261            if (commitOffsetValue > 0) {
262                commitOffsetEnable = true;
263            }
264        }
265
266        String subExpression = null;
267        boolean classFilter = false;
268        // 從負載均衡器裏面獲取消費組的信息
269        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().
270        get(pullRequest.getMessageQueue().getTopic());
271        if (sd != null) {
272            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull()
273            && !sd.isClassFilterMode()) {
274                subExpression = sd.getSubString();
275            }
276
277            classFilter = sd.isClassFilterMode();
278        }
279        // 構建系統flag
280        int sysFlag = PullSysFlag.buildSysFlag(
281            commitOffsetEnable, // commitOffset
282            true// suspend
283            subExpression != null// subscription
284            classFilter // class filter
285        );
286        try {
287            // 拉取消息
288            this.pullAPIWrapper.pullKernelImpl(
289                pullRequest.getMessageQueue(),
290                subExpression,
291                subscriptionData.getExpressionType(),
292                subscriptionData.getSubVersion(),
293                // 須要消費的offSet起始值
294                pullRequest.getNextOffset(),
295                // 一次獲取消息多少的大小
296                this.defaultMQPushConsumer.getPullBatchSize(),
297                sysFlag,
298                commitOffsetValue,
299                // broker端阻塞的時間15秒,默認
300                BROKER_SUSPEND_MAX_TIME_MILLIS,
301                //過時時間30秒,默認
302                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
303                // 默認爲異步 , 獲取結果後回調pullCallback的onSuccess方法
304                CommunicationMode.ASYNC,
305                pullCallback
306            );
307        } catch (Exception e) {
308            log.error("pullKernelImpl exception", e);
309            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
310        }
311    }

上面一大坨的代碼,有些朋友基本上就慌了,可能瞬間就下關掉這篇文章了,其實上面的代碼個人註釋基本上寫的很是清楚了,下面用文字總結一下

步驟說明:

  1. pullRequest中獲取處理隊列,每一個RocketMq的queue被消費者消費的時候,都會指定一個pullRequest ,
    那麼這個queue拉取到的緩存消息,消息統計等信息,都在ProcessQueue裏面被包含着。

  2. 設置pullRequest處理隊列最後一次的處理時間

  3. 檢查服務的狀態是否OK, 也就是是否在Running狀態

  4. rocketMq的流量控制,push模式下的,總共分爲三種流控模式, 在講流控模式以前,首先咱們要知道一點,全部的消息獲取過來的時候都是先放到ProcessQueue 裏面去的,也就是說ProcessQueue 緩存了全部未處理的消息。

  5. 緩存消息數量控制 ,pullThresholdForQueue = 1000  , 緩存在消費端的消息最多爲1000條,若是大於,那麼此次就放棄拉取,等待下次

  6. 緩存消息大小控制,緩存消息大小是否超過100MB,pullThresholdSizeForQueue = 100  ,若是大於100MB,那麼此次就放棄拉取,等待下次拉取

  7. 針對並行消費(普通消息), 須要控制隊列的offSet的跨度, 跨度不能夠超過2000,

  8. 針對集羣消費,獲取已提交offSet值,由於廣播消費的offSet不提交到broker上面去, 這個是爲了去borker上面獲取消息用的

  9. 獲取消息,調用pullAPIWrapper.pullKernelImpl 方法,該方法有幾個比較重要的參數,

     1// 拉取消息
    2this.pullAPIWrapper.pullKernelImpl(
    3         // 請求的隊列
    4     pullRequest.getMessageQueue(),
    5         // 訂閱組的信息(className)
    6     subExpression,
    7         // 表達式類型:TAG類型
    8     subscriptionData.getExpressionType(),
    9         //訂閱組的版本(時間戳)
    10     subscriptionData.getSubVersion(),
    11     // 須要消費的offSet起始值
    12     pullRequest.getNextOffset(),
    13     // 一次獲取消息多少的大小
    14     this.defaultMQPushConsumer.getPullBatchSize(),
    15     sysFlag,
    16         // 已經提交的offSet值
    17     commitOffsetValue,
    18     // broker端阻塞的時間15秒,默認
    19     BROKER_SUSPEND_MAX_TIME_MILLIS,
    20     //過時時間30秒,默認
    21     CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
    22     // 默認爲異步 , 獲取結果後回調pullCallback的onSuccess方法
    23     CommunicationMode.ASYNC,
    24     pullCallback
    25);

    BROKER_SUSPEND_MAX_TIME_MILLIS : 15秒,這個其實就是push模式的實現,當請求到達borker的時候,若是發現沒有消息,會阻塞15秒鐘,由這個參數控制,

    pullCallback : 獲取到結果以後,會進行結果回調,調用onSuccess方法

  10. pullCallback 的結果回調 , 結果回調狀態總共有下面幾種

     1public enum PullStatus {
    2   /**
    3    * 消息找到了
    4    */

    5   FOUND,
    6   /**
    7    * 沒有新消息過來
    8    */

    9   NO_NEW_MSG,
    10   /**
    11    * 沒有匹配到的消息
    12    */

    13   NO_MATCHED_MSG,
    14   /**
    15    * 錯誤的offSet值,太大了或者過小了
    16    */

    17   OFFSET_ILLEGAL
    18}

    咱們主要講消息找到了的那種場景。

  11. FOUND消息狀態

     1switch (pullResult.getPullStatus()) {
    2     case FOUND: // 找到消息了
    3       long prevRequestOffset = pullRequest.getNextOffset();
    4       pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    5       long pullRT = System.currentTimeMillis() - beginTimestamp;
    6       DefaultMQPushConsumerImpl.this.getConsumerStatsManager().
    7         incPullRT(pullRequest.getConsumerGroup(),
    8                   pullRequest.getMessageQueue().getTopic(), pullRT);
    9       long firstMsgOffset = Long.MAX_VALUE;
    10       // 判斷消息結果是否爲空,
    11        if (pullResult.getMsgFoundList() == null||pullResult.getMsgFoundList().isEmpty()) {
    12         // 爲空就等待下次繼續拉取
    13         DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    14       } else {
    15         // 不爲空,獲取第一條消息的firstMsgOffset
    16         firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
    17         // 這裏貌似是添加消息的統計信息, TPS之類的
    18         DefaultMQPushConsumerImpl.this.getConsumerStatsManager().
    19           incPullTPS(pullRequest.getConsumerGroup(),
    20                      pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
    21         // 先將本次拉取到的消息放入到pullRequest處理隊列裏面去
    22         boolean dispatchToConsume = processQueue.putMessage(pullResult.
    23                                                             getMsgFoundList());
    24         DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    25           pullResult.getMsgFoundList(),
    26           processQueue,
    27           pullRequest.getMessageQueue(),
    28           dispatchToConsume);
    29         // 下次獲取消息的時間間隔,pullInterval = 0 ,默認值
    30         if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()>0) {
    31           // 若是大於0 ,則延遲獲取
    32           DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
    33                                   DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
    34         } else {
    35              // 直接塞到阻塞隊列裏面去,幾乎立刻就會繼續請求
    36              DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    37            }
    38          }
    39          if (pullResult.getNextBeginOffset() < prevRequestOffset
    40              || firstMsgOffset < prevRequestOffset) {
    41            log.warn(
    42              "[BUG] pull message result maybe data wrong
    43              , nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}"
    ,
    44              pullResult.getNextBeginOffset(),
    45              firstMsgOffset,
    46              prevRequestOffset);
    47          }
    48    break;
    49  default:
    50  break;
    51}

    步驟說明:

    1.判斷消息結果是否爲空,爲空就等待下次繼續拉取,不爲空,則繼續處理消息
    2.調用processQueue.putMessage方法,先將本次拉取到的消息放入到pullRequest處理隊列裏面去,processQueue維護了一個TreeMap , key = offSet , value爲消息。
    3.putMessage方法中,會維護兩個變量來針對順序消費, consuming : 是否在被消費,dispatchToConsume : 是否須要分發給消費者消息處理現場

     1 // 順序消費
    2 // 線路1:  第一次來了  10 條消息, 設置 dispatchToConsume = true , consuming = true
    3 //          dispatchToConsume = true , 那麼放入一個請求到線程池裏面,
    4 // 線路2:   又來了10 條消息 , 發現consuming = true, 正在消費,則返回dispatchToConsume = false
    5 // 線路2:   線路2的10條消息不會往線程池裏面丟request
    6
    7 // 線路1:  開始讀取processQueue裏面的消息,
    8
    9 // 線路3:   又來了10 條消息 , 發現consuming = true, 正在消費,則返回dispatchToConsume = false
    10 // 線路3:   線路3的10條消息不會往線程池裏面丟request
    11
    12 // 線路1: 讀取完畢線程迴歸,設置consuming = false
    13
    14 // 線路4:   又來了10 條消息 , 發現consuming = false, 沒有在消費,則返回dispatchToConsume = true
    15 // 線路4:   線路4的會往線程池裏面建立一個Request
    16
    17 // 綜上所述: 順序消費有且僅會建立一個線程 。 從而保證順序消費。經過pullRequest 裏面的ProcessQueue 的treeMap ,實現offset最小的先消費

    4.將消息submit到線程池裏面去,至此消息拉取完畢

    總結:
    RocketMq的推模式不是嚴格意義上的推,是經過後臺啓動異步線程,一個queue構建一個pullRequest, 異步的去請求的拉模式,只不過是經過broker端阻塞(默認阻塞15秒)的方法,達到了推模式的效果。其實就是長輪詢模式,哈哈

    同時,rocketMq經過流量控制模塊(消息數量(1000),消息大小(100MB),offSet跨度(並行消費))來控制消費者這一邊的壓力,不至於消費慢,被後臺異步線程給壓死。


歡迎關注個人微信公衆號:【sharedCode】



回覆:「資源」、「架構、「開發手冊等關鍵詞獲取海量免費學習資料。



本文分享自微信公衆號 - sharedCode(sharedCode)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索