Eureka精品源碼
web
xxl-job系列apache
前言
本文詳細介紹了rocketMQ中的推模型,看完這片文章,你會對push模型有很是很是深刻的理解。架構
推消息的經常使用示例app
1public class Consumer {
2
3 public static void main(String[] args) throws InterruptedException, MQClientException {
4
5 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
6
7 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
8
9 consumer.subscribe("TopicTest", "*");
10 consumer.registerMessageListener(new MessageListenerConcurrently() {
11
12 @Override
13 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
14 ConsumeConcurrentlyContext context) {
15 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
16 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
17 }
18 });
19
20 consumer.start();
21
22 System.out.printf("Consumer Started.%n");
23 }
24}
推模式和拉模式在使用上有很大的不一樣,其中實現機制最大的不一樣在於DefaultMQPushConsumer
負載均衡
咱們首先來看一下rocketMq推拉模式實現的主要類圖異步
其實MQConsumer上面還有一個頂級類MQAdmin,沒有畫出來。ide
首先咱們看一下MQConsumer的代碼實現
1/**
2 * Message queue consumer interface
3 */
4public interface MQConsumer extends MQAdmin {
5 /**
6 * If consuming failure,message will be send back to the brokers,and delay consuming some time
7 */
8 @Deprecated
9 void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
10 MQBrokerException, InterruptedException, MQClientException;
11
12 /**
13 * 若是消費失敗,消息會被從新發送到borker, 而且延遲消費。
14 */
15 void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
16 throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
17
18 /**
19 * 獲取消息隊列從訂閱的topic上
20 *
21 * @param topic message topic
22 * @return queue set
23 */
24 Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;
25}
PullMessageService
介紹拉模式的消息,那麼必需要講一下PullMessageService
, 該類是一個線程類,繼承了ServiceThread這個對象,其實就是一個線程。
下面看一下ServiceThread
的大體實現,隱藏了一些暫時不須要了解的代碼,否則代碼太長了
1public abstract class ServiceThread implements Runnable {
2 private Thread thread;
3 protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
4 protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
5 protected volatile boolean stopped = false;
6 protected boolean isDaemon = false;
7
8 //Make it able to restart the thread
9 private final AtomicBoolean started = new AtomicBoolean(false);
10
11 public ServiceThread() {
12
13 }
14
15 public abstract String getServiceName();
16
17 public void start() {
18 log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
19 // 更新啓動狀態,看是否重複啓動
20 if (!started.compareAndSet(false, true)) {
21 return;
22 }
23
24 // 默認是不中止的
25 stopped = false;
26 // 初始化一個線程,runnable就是當前的類
27 this.thread = new Thread(this, getServiceName());
28 // 是不是守護線程, 默認不是守護線程,JVM關閉的時候,須要等當前線程執行完畢
29 this.thread.setDaemon(isDaemon);
30 // 啓動啦。。。
31 this.thread.start();
32 }
33}
看一下PullMessageService
的大體實現
1public class PullMessageService extends ServiceThread {
2 // 省略....
3 @Override
4 public void run() {
5 log.info(this.getServiceName() + " service started");
6
7 while (!this.isStopped()) {
8 try {
9 // 從隊列裏面獲取請求信息
10 PullRequest pullRequest = this.pullRequestQueue.take();
11 // 獲取消息
12 this.pullMessage(pullRequest);
13 } catch (InterruptedException ignored) {
14 } catch (Exception e) {
15 log.error("Pull Message Service Run Method exception", e);
16 }
17 }
18
19 log.info(this.getServiceName() + " service end");
20 }
21 // 省略....
22}
大體的代碼層次就是下面這樣子的
------Runnable
----------ServiceThread
--------------PullMessageService
下面看一下MQClientInstance初始化建立的時候的代碼,刪除了一些不屬於本章節的內容。
1public void start() throws MQClientException {
2 synchronized (this) {
3 switch (this.serviceState) {
4 case CREATE_JUST:
5 // 省略源碼------
6 // 開啓pullService , 針對consumer
7 this.pullMessageService.start();
8 // 省略源碼------
9 break;
10 case RUNNING:
11 break;
12 case SHUTDOWN_ALREADY:
13 break;
14 case START_FAILED:
15 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
16 default:
17 break;
18 }
19 }
20 }
在MQClientInstance啓動的時候,會調用pullMessageService的start的方法,其實就是調用到了ServiceThread的start方法。
好了,上面咱們已經清楚了什麼是PullMessageService, 在這裏作一下總結。
PullMessageService 就是一個後臺線程,消費者啓動這個線程用來獲取去borker獲取消息的。
這個類裏面有個很重要的東西,就是一個阻塞隊列
1public class PullMessageService extends ServiceThread {
2 private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
3}
這個隊列很是重要,該隊列裏面存儲的是有去獲取消息的請求, 這個時候,就有人連環三問了
什麼時間
什麼地點
什麼人
會往pullRequestQueue
裏面放元素呢?
OK,帶着這個問題,筆者調試了一波代碼,一頓操做猛如虎,總算搞了個頭緒出來。
程序啓動的時候, 首先往這個隊列裏面放元素的是負載均衡器,
org.apache.rocketmq.client.impl.consumer.RebalanceImpl
,負載均衡器會根據負載均衡算法計算出當前消費者須要消費的隊列, 每個隊列構建一個pullRequest
, 而後放入隊列
每次請求消息以後,會把
pullRequest
從新放入隊列, 以供接下來繼續請求程序在運行過程當中,負載均衡器檢測到有新增長的隊列,那麼會把新增長的隊列做爲一個
pullRequest
放入隊列
提問:pullRequest何時會被移除?
答:pullRequest裏面有一個很重要的概念ProcessQueue
, 這個是請求處理隊列,該隊列裏面會維護好一個 dropped
變量,當變成false的時候,這個pullRequest從對立裏面取出來以後,就不會再放回去了,至關於被移除了。 通常在Rebalance
的時候進行移除換句話說,其實就是當消費者數量發生變化或者topic隊列發生變化的時候,經過負載均衡器從新進行負載的時候,若是這個queue再也不被本Consumer消費,那麼其所對應的pullRequest會被移除
1public class PullRequest {
2 // 當前消費組
3 private String consumerGroup;
4 // 消息隊列信息
5 private MessageQueue messageQueue;
6 // 接下來要消費的offSet進度
7 private long nextOffset;
8 // 當前隊列信息
9 private ProcessQueue processQueue;
10 private boolean lockedFirst = false;
11}
那麼接下來看一下push模式的消息拉取是什麼樣子的。
消息拉取
上一個段落裏面,pullMessageService裏面從隊列獲取了一個pullRequest , 而後調用了pullMessage
方法,下面看一下這個方法
org.apache.rocketmq.client.impl.consumer.PullMessageService
1private void pullMessage(final PullRequest pullRequest) {
2 // 從MQClientFactory裏面獲取消費者信息
3 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
4 if (consumer != null) {
5 // 這裏強轉給PushConsumer, 若是不是的話,那麼確定就報錯了,因此這裏作了一個限制
6 DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
7 // 走一把
8 impl.pullMessage(pullRequest);
9 } else {
10 log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
11 }
12 }
步驟說明:
從MQClientFactory裏面獲取消費者信息, 這個在
DefaultMQPushConsumer
初始化的時候,註冊進去的。當時註冊的是DefaultMQPushConsumerImpl
強轉爲
DefaultMQPushConsumerImpl
, 若是不是的話,那麼確定就報錯了,因此這裏作了一個限制調用
DefaultMQPushConsumerImpl
的pullMessage
方法
pullMessage
源碼入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
1public void pullMessage(final PullRequest pullRequest) {
2 // 從pullRequest中獲取處理隊列,每一個RocketMq的queue被消費者消費的時候,都會指定一個pullRequest ,
3 // 那麼這個queue拉取到的緩存消息,消息統計等信息,都在`ProcessQueue`裏面被包含着。
4 final ProcessQueue processQueue = pullRequest.getProcessQueue();
5 // 判斷當前pullRequest是否被丟棄
6 if (processQueue.isDropped()) {
7 log.info("the pull request[{}] is dropped.", pullRequest.toString());
8 return;
9 }
10 // 設置pullRequest處理隊列最後一次的處理時間
11 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
12
13 try {
14 // 檢查服務的狀態是否OK, 也就是是否在Running狀態
15 this.makeSureStateOK();
16 } catch (MQClientException e) {
17 log.warn("pullMessage exception, consumer state not ok", e);
18 // 將pullRequest放入隊列,只不過是經過後臺的定時線程池延遲放入,防止一直請求
19 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
20 return;
21 }
22 // 消費者是否被暫停,若是被暫停的話,則暫時返回
23 if (this.isPause()) {
24 log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
25 // 將pullRequest放入隊列,只不過是經過後臺的定時線程池延遲放入,防止一直請求
26 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
27 return;
28 }
29 // 當前pullRequest總共緩存了多少消息
30 long cachedMessageCount = processQueue.getMsgCount().get();
31 // 計算緩存消息的大小, 以MB爲單位
32 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
33 // pullThresholdForQueue = 1000 , 緩存在消費端的消息最多爲1000條,若是大於,那麼此次就放棄拉取,等待下次
34 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
35 // 將pullRequest放入隊列,只不過是經過後臺的定時線程池延遲放入,防止一直請求
36 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
37 //緩存消息的數量超過了閾值, 流量控制也是, 這裏的意思其實就是說,
38 // 這個消費者不是一次兩次緩存消息超過數量了,而是超過了1000以上了,就是消費者消費的太慢了,大家發的太快了,打印一把日誌
39 if ((queueFlowControlTimes++ % 1000) == 0) {
40 log.warn(
41 "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
42 this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
43 }
44 return;
45 }
46 // 緩存消息大小是否超過100MB,pullThresholdSizeForQueue = 100 ,
47 if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
48 // 將pullRequest放入隊列,只不過是經過後臺的定時線程池延遲放入,防止一直請求
49 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
50 //緩存消息的數量超過了閾值, 流量控制也是, 這裏的意思其實就是說,
51 // 這個消費者不是一次兩次緩存消息大小超過100MB了,而是超過了1000以上了,就是消費者消費的太慢了,大家發的太快了,消息太大了,打印一把日誌
52 if ((queueFlowControlTimes++ % 1000) == 0) {
53 log.warn(
54 "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
55 this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
56 }
57 return;
58 }
59 // 消費端是不是排序消費
60 if (!this.consumeOrderly) {
61 // 判斷消息是否跨過了最大的消息跨度 , 經過offset的大小, 緩存的消息 offset的最大跨度不能夠超過 consumeConcurrentlyMaxSpan = 2000
62 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
63 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
64 if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
65 log.warn(
66 "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
67 processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
68 pullRequest, queueMaxSpanFlowControlTimes);
69 }
70 return;
71 }
72 } else {
73 // 若是是排序消費,那麼pullRequest就是須要上鎖了,只容許一我的同時操做隊列,防止亂序
74 if (processQueue.isLocked()) {
75 if (!pullRequest.isLockedFirst()) {
76 // 計算順序消費的話,從哪裏開始消費爲準, 由於順序消費的話在下面的
77 // PullCallback onSuccess方法裏面,存在一個dispatchToConsume參數
78 // 若是dispatchToConsume = false的話,那麼消息就不會被丟到線程池被後臺線程消費
79 // ,僅在PullRequest的處理隊列裏面
80 final long offset = this.rebalanceImpl.computePullFromWhere
81 (pullRequest.getMessageQueue());
82 boolean brokerBusy = offset < pullRequest.getNextOffset();
83 log.info("the first time to pull message, so fix offset from broker.
84 pullRequest: {} NewOffset: {} brokerBusy: {}",
85 pullRequest, offset, brokerBusy);
86 if (brokerBusy) {
87 log.info("[NOTIFYME]the first time to pull message, but pull request
88 offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
89 pullRequest, offset);
90 }
91 // 設置下次拉取的offSet
92 pullRequest.setLockedFirst(true);
93 pullRequest.setNextOffset(offset);
94 }
95 } else {
96 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
97 log.info("pull message later because not locked in broker, {}", pullRequest);
98 return;
99 }
100 }
101
102 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().
103 get(pullRequest.getMessageQueue().getTopic());
104 if (null == subscriptionData) {
105 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
106 log.warn("find the consumer's subscription failed, {}", pullRequest);
107 return;
108 }
109
110 final long beginTimestamp = System.currentTimeMillis();
111 // 消息結果回調
112 PullCallback pullCallback = new PullCallback() {
113 @Override
114 public void onSuccess(PullResult pullResult) {
115 if (pullResult != null) {
116 // 獲取pullResult
117 pullResult = DefaultMQPushConsumerImpl.this.
118 pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
119 subscriptionData);
120 // 消息結果狀態
121 switch (pullResult.getPullStatus()) {
122 case FOUND: // 找到消息了
123 long prevRequestOffset = pullRequest.getNextOffset();
124 pullRequest.setNextOffset(pullResult.getNextBeginOffset());
125 long pullRT = System.currentTimeMillis() - beginTimestamp;
126 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().
127 incPullRT(pullRequest.getConsumerGroup(),
128 pullRequest.getMessageQueue().getTopic(), pullRT);
129
130 long firstMsgOffset = Long.MAX_VALUE;
131 // 判斷消息結果是否爲空,
132 if (pullResult.getMsgFoundList() == null || pullResult.getMsgFou
133 ndList().isEmpty()) {
134 // 爲空就等待下次繼續拉取
135 DefaultMQPushConsumerImpl.this.
136 executePullRequestImmediately(pullRequest);
137 } else {
138 // 不爲空,獲取第一條消息的firstMsgOffset
139 firstMsgOffset = pullResult.getMsgFoundList().get(0).
140 getQueueOffset();
141
142 // 這裏貌似是添加消息的統計信息, TPS之類的
143 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().
144 incPullTPS(pullRequest.getConsumerGroup(), pullRequest.
145 getMessageQueue().getTopic(), pullResult.getMsgFoundList()
146 .size());
147 // 先將本次拉取到的消息放入到pullRequest處理隊列裏面去
148 boolean dispatchToConsume = processQueue.putMessage(pullResult.
149 getMsgFoundList());
150
151 // 來來來,哥幾個,這裏就是重點了,將消息放入後臺消費者的線程池並行消費。
152 // dispatchToConsume 這個參數僅針對順序消費的場景,並行消費的場景不用這個參數
153 DefaultMQPushConsumerImpl.this.consumeMessageService.
154 submitConsumeRequest(
155 pullResult.getMsgFoundList(),
156 processQueue,
157 pullRequest.getMessageQueue(),
158 dispatchToConsume);
159
160 // 下次獲取消息的時間間隔,pullInterval = 0 ,默認值
161 if (DefaultMQPushConsumerImpl.this.
162 defaultMQPushConsumer.getPullInterval() > 0) {
163 // 若是大於0 ,則延遲獲取
164 DefaultMQPushConsumerImpl.this.
165 executePullRequestLater(pullRequest,
166 DefaultMQPushConsumerImpl.this.
167 defaultMQPushConsumer.getPullInterval());
168 } else {
169 // 直接塞到阻塞隊列裏面去,幾乎立刻就會繼續請求
170 DefaultMQPushConsumerImpl.this.
171 executePullRequestImmediately(pullRequest);
172 }
173 }
174
175 if (pullResult.getNextBeginOffset() < prevRequestOffset
176 || firstMsgOffset < prevRequestOffset) {
177 log.warn(
178 "[BUG] pull message result maybe
179 data wrong, nextBeginOffset: {}
180 firstMsgOffset: {} prevRequestOffset: {}",
181 pullResult.getNextBeginOffset(),
182 firstMsgOffset,
183 prevRequestOffset);
184 }
185
186 break;
187 case NO_NEW_MSG: // 沒有新消息
188 // 沒有新消息,將pullRequest塞到阻塞隊列裏面去,幾乎立刻就會繼續請求
189 pullRequest.setNextOffset(pullResult.getNextBeginOffset());
190
191 DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
192
193 DefaultMQPushConsumerImpl.this.
194 executePullRequestImmediately(pullRequest);
195 break;
196 case NO_MATCHED_MSG: // 沒有找到匹配的消息
197 // 沒有新消息,將pullRequest塞到阻塞隊列裏面去,幾乎立刻就會繼續請求
198 pullRequest.setNextOffset(pullResult.getNextBeginOffset());
199
200 DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
201
202 DefaultMQPushConsumerImpl.this.
203 executePullRequestImmediately(pullRequest);
204 break;
205 case OFFSET_ILLEGAL:
206 // offset錯誤,
207 log.warn("the pull request offset illegal, {} {}",
208 pullRequest.toString(), pullResult.toString());
209 pullRequest.setNextOffset(pullResult.getNextBeginOffset());
210 // pullRequest設置爲丟棄,後面不獲取消息了
211 pullRequest.getProcessQueue().setDropped(true);
212 // 延遲執行,10秒鐘
213 DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
214
215 @Override
216 public void run() {
217 try {
218 DefaultMQPushConsumerImpl.this.offsetStore.
219 updateOffset(pullRequest.getMessageQueue(),
220 pullRequest.getNextOffset(), false);
221
222 DefaultMQPushConsumerImpl.this.offsetStore.
223 persist(pullRequest.getMessageQueue());
224 // 移除
225 DefaultMQPushConsumerImpl.this.rebalanceImpl.
226 removeProcessQueue(pullRequest.getMessageQueue());
227
228 log.warn("fix the pull request offset, {}", pullRequest);
229 } catch (Throwable e) {
230 log.error("executeTaskLater Exception", e);
231 }
232 }
233 }, 10000);
234 break;
235 default:
236 break;
237 }
238 }
239 }
240
241 @Override
242 public void onException(Throwable e) {
243 // 若是出現異常的話,走這個地方
244 if (!pullRequest.getMessageQueue().getTopic().
245 startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
246 log.warn("execute the pull request exception", e);
247 }
248 // 將消息請求再次放入隊列,只不過放入的是須要延遲一會發送的
249 DefaultMQPushConsumerImpl.this.
250 executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
251 }
252 };
253 // 獲取已提交offSet值
254 boolean commitOffsetEnable = false;
255 long commitOffsetValue = 0L;
256 // 判斷是否爲集羣消費,由於廣播消費的offSet不提交到broker上面去, 這個是爲了去borker上面獲取消息用的
257 if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
258 // 從消息存儲器裏面獲取
259 commitOffsetValue = this.offsetStore.
260 readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
261 if (commitOffsetValue > 0) {
262 commitOffsetEnable = true;
263 }
264 }
265
266 String subExpression = null;
267 boolean classFilter = false;
268 // 從負載均衡器裏面獲取消費組的信息
269 SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().
270 get(pullRequest.getMessageQueue().getTopic());
271 if (sd != null) {
272 if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull()
273 && !sd.isClassFilterMode()) {
274 subExpression = sd.getSubString();
275 }
276
277 classFilter = sd.isClassFilterMode();
278 }
279 // 構建系統flag
280 int sysFlag = PullSysFlag.buildSysFlag(
281 commitOffsetEnable, // commitOffset
282 true, // suspend
283 subExpression != null, // subscription
284 classFilter // class filter
285 );
286 try {
287 // 拉取消息
288 this.pullAPIWrapper.pullKernelImpl(
289 pullRequest.getMessageQueue(),
290 subExpression,
291 subscriptionData.getExpressionType(),
292 subscriptionData.getSubVersion(),
293 // 須要消費的offSet起始值
294 pullRequest.getNextOffset(),
295 // 一次獲取消息多少的大小
296 this.defaultMQPushConsumer.getPullBatchSize(),
297 sysFlag,
298 commitOffsetValue,
299 // broker端阻塞的時間15秒,默認
300 BROKER_SUSPEND_MAX_TIME_MILLIS,
301 //過時時間30秒,默認
302 CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
303 // 默認爲異步 , 獲取結果後回調pullCallback的onSuccess方法
304 CommunicationMode.ASYNC,
305 pullCallback
306 );
307 } catch (Exception e) {
308 log.error("pullKernelImpl exception", e);
309 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
310 }
311 }
上面一大坨的代碼,有些朋友基本上就慌了,可能瞬間就下關掉這篇文章了,其實上面的代碼個人註釋基本上寫的很是清楚了,下面用文字總結一下
步驟說明:
從
pullRequest
中獲取處理隊列,每一個RocketMq的queue被消費者消費的時候,都會指定一個pullRequest
,
那麼這個queue拉取到的緩存消息,消息統計等信息,都在ProcessQueue
裏面被包含着。設置pullRequest處理隊列最後一次的處理時間
檢查服務的狀態是否OK, 也就是是否在Running狀態
rocketMq的流量控制,push模式下的,總共分爲三種流控模式, 在講流控模式以前,首先咱們要知道一點,全部的消息獲取過來的時候都是先放到
ProcessQueue
裏面去的,也就是說ProcessQueue
緩存了全部未處理的消息。緩存消息數量控制 ,
pullThresholdForQueue = 1000
, 緩存在消費端的消息最多爲1000條,若是大於,那麼此次就放棄拉取,等待下次緩存消息大小控制,緩存消息大小是否超過100MB,
pullThresholdSizeForQueue = 100
,若是大於100MB,那麼此次就放棄拉取,等待下次拉取針對並行消費(普通消息), 須要控制隊列的offSet的跨度, 跨度不能夠超過2000,
針對集羣消費,獲取已提交offSet值,由於廣播消費的offSet不提交到broker上面去, 這個是爲了去borker上面獲取消息用的
獲取消息,調用
pullAPIWrapper.pullKernelImpl
方法,該方法有幾個比較重要的參數,1// 拉取消息
2this.pullAPIWrapper.pullKernelImpl(
3 // 請求的隊列
4 pullRequest.getMessageQueue(),
5 // 訂閱組的信息(className)
6 subExpression,
7 // 表達式類型:TAG類型
8 subscriptionData.getExpressionType(),
9 //訂閱組的版本(時間戳)
10 subscriptionData.getSubVersion(),
11 // 須要消費的offSet起始值
12 pullRequest.getNextOffset(),
13 // 一次獲取消息多少的大小
14 this.defaultMQPushConsumer.getPullBatchSize(),
15 sysFlag,
16 // 已經提交的offSet值
17 commitOffsetValue,
18 // broker端阻塞的時間15秒,默認
19 BROKER_SUSPEND_MAX_TIME_MILLIS,
20 //過時時間30秒,默認
21 CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
22 // 默認爲異步 , 獲取結果後回調pullCallback的onSuccess方法
23 CommunicationMode.ASYNC,
24 pullCallback
25);BROKER_SUSPEND_MAX_TIME_MILLIS
: 15秒,這個其實就是push模式的實現,當請求到達borker的時候,若是發現沒有消息,會阻塞15秒鐘,由這個參數控制,pullCallback
: 獲取到結果以後,會進行結果回調,調用onSuccess方法pullCallback
的結果回調 , 結果回調狀態總共有下面幾種1public enum PullStatus {
2 /**
3 * 消息找到了
4 */
5 FOUND,
6 /**
7 * 沒有新消息過來
8 */
9 NO_NEW_MSG,
10 /**
11 * 沒有匹配到的消息
12 */
13 NO_MATCHED_MSG,
14 /**
15 * 錯誤的offSet值,太大了或者過小了
16 */
17 OFFSET_ILLEGAL
18}咱們主要講消息找到了的那種場景。
FOUND消息狀態
1switch (pullResult.getPullStatus()) {
2 case FOUND: // 找到消息了
3 long prevRequestOffset = pullRequest.getNextOffset();
4 pullRequest.setNextOffset(pullResult.getNextBeginOffset());
5 long pullRT = System.currentTimeMillis() - beginTimestamp;
6 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().
7 incPullRT(pullRequest.getConsumerGroup(),
8 pullRequest.getMessageQueue().getTopic(), pullRT);
9 long firstMsgOffset = Long.MAX_VALUE;
10 // 判斷消息結果是否爲空,
11 if (pullResult.getMsgFoundList() == null||pullResult.getMsgFoundList().isEmpty()) {
12 // 爲空就等待下次繼續拉取
13 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
14 } else {
15 // 不爲空,獲取第一條消息的firstMsgOffset
16 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
17 // 這裏貌似是添加消息的統計信息, TPS之類的
18 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().
19 incPullTPS(pullRequest.getConsumerGroup(),
20 pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
21 // 先將本次拉取到的消息放入到pullRequest處理隊列裏面去
22 boolean dispatchToConsume = processQueue.putMessage(pullResult.
23 getMsgFoundList());
24 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
25 pullResult.getMsgFoundList(),
26 processQueue,
27 pullRequest.getMessageQueue(),
28 dispatchToConsume);
29 // 下次獲取消息的時間間隔,pullInterval = 0 ,默認值
30 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()>0) {
31 // 若是大於0 ,則延遲獲取
32 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
33 DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
34 } else {
35 // 直接塞到阻塞隊列裏面去,幾乎立刻就會繼續請求
36 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
37 }
38 }
39 if (pullResult.getNextBeginOffset() < prevRequestOffset
40 || firstMsgOffset < prevRequestOffset) {
41 log.warn(
42 "[BUG] pull message result maybe data wrong
43 , nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
44 pullResult.getNextBeginOffset(),
45 firstMsgOffset,
46 prevRequestOffset);
47 }
48 break;
49 default:
50 break;
51}步驟說明:
1.判斷消息結果是否爲空,爲空就等待下次繼續拉取,不爲空,則繼續處理消息
2.調用processQueue.putMessage
方法,先將本次拉取到的消息放入到pullRequest處理隊列裏面去,processQueue
維護了一個TreeMap , key = offSet , value爲消息。
3.putMessage方法中,會維護兩個變量來針對順序消費,consuming
: 是否在被消費,dispatchToConsume
: 是否須要分發給消費者消息處理現場1 // 順序消費
2 // 線路1: 第一次來了 10 條消息, 設置 dispatchToConsume = true , consuming = true
3 // dispatchToConsume = true , 那麼放入一個請求到線程池裏面,
4 // 線路2: 又來了10 條消息 , 發現consuming = true, 正在消費,則返回dispatchToConsume = false
5 // 線路2: 線路2的10條消息不會往線程池裏面丟request
6
7 // 線路1: 開始讀取processQueue裏面的消息,
8
9 // 線路3: 又來了10 條消息 , 發現consuming = true, 正在消費,則返回dispatchToConsume = false
10 // 線路3: 線路3的10條消息不會往線程池裏面丟request
11
12 // 線路1: 讀取完畢線程迴歸,設置consuming = false
13
14 // 線路4: 又來了10 條消息 , 發現consuming = false, 沒有在消費,則返回dispatchToConsume = true
15 // 線路4: 線路4的會往線程池裏面建立一個Request
16
17 // 綜上所述: 順序消費有且僅會建立一個線程 。 從而保證順序消費。經過pullRequest 裏面的ProcessQueue 的treeMap ,實現offset最小的先消費4.將消息submit到線程池裏面去,至此消息拉取完畢
總結:
RocketMq的推模式不是嚴格意義上的推,是經過後臺啓動異步線程,一個queue構建一個pullRequest, 異步的去請求的拉模式,只不過是經過broker端阻塞(默認阻塞15秒)的方法,達到了推模式的效果。其實就是長輪詢模式,哈哈同時,rocketMq經過流量控制模塊(消息數量(1000),消息大小(100MB),offSet跨度(並行消費))來控制消費者這一邊的壓力,不至於消費慢,被後臺異步線程給壓死。
歡迎關注個人微信公衆號:【sharedCode】
回覆:「資源」、「架構」、「開發手冊」等關鍵詞獲取海量免費學習資料。
本文分享自微信公衆號 - sharedCode(sharedCode)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。