🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: json
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
本章主要解析 消費 邏輯涉及到的源碼。
由於篇幅較長,分紅上下兩篇:後端
Broker
相關源碼。Consumer
相關源碼。本文便是上篇。數組
ok,先看第一張關於消費邏輯的圖:安全
![]()
消費邏輯圖
再看消費邏輯精簡的順序圖(實際狀況會略有差異):bash
![]()
Consumer&Broker消費精簡圖.png
ConsumeQueue
、MappedFileQueue
、MappedFile
的關係以下:微信
![]()
ConsumeQueue、MappedFileQueue、MappedFile的關係
ConsumeQueue
:MappedFileQueue
:MappedFile
= 1 : 1 : N。
反應到系統文件以下:app
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd
/Users/yunai/store/consumequeue
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -ls
total 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:52 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 1
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 2
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 3
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/
Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -ls
total 11720
11720 -rw-r--r-- 1 yunai staff 6000000 4 27 21:55 00000000000000000000複製代碼
ConsumeQueue
、MappedFileQueue
、MappedFile
的定義以下:less
MappedFile
:00000000000000000000等文件。MappedFileQueue
:MappedFile
所在的文件夾,對 MappedFile
進行封裝成文件隊列,對上層提供可無限使用的文件容量。
MappedFile
統一文件大小。ConsumeQueue
裏默認爲 6000000B。ConsumeQueue
:針對 MappedFileQueue
的封裝使用。
Store : ConsumeQueue = ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>
。ConsumeQueue
存儲在 MappedFile
的內容必須大小是 20B( ConsumeQueue.CQ_STORE_UNIT_SIZE
),有兩種內容類型:dom
MESSAGE_POSITION_INFO
:消息位置信息。BLANK
: 文件前置空白佔位。當歷史 Message
被刪除時,須要用 BLANK
佔位被刪除的消息。MESSAGE_POSITION_INFO
在 ConsumeQueue
存儲結構:eclipse
第幾位 | 字段 | 說明 | 數據類型 | 字節數 |
---|---|---|---|---|
1 | offset | 消息 CommitLog 存儲位置 |
Long | 8 |
2 | size | 消息長度 | Int | 4 |
3 | tagsCode | 消息tagsCode | Long | 8 |
BLANK
在 ConsumeQueue
存儲結構:
第幾位 | 字段 | 說明 | 數據類型 | 字節數 |
---|---|---|---|---|
1 | 0 | Long | 8 | |
2 | Integer.MAX_VALUE | Int | 4 | |
3 | 0 | Long | 8 |
主要有兩個組件:
ReputMessageService
:write ConsumeQueue。FlushConsumeQueueService
:flush ConsumeQueue。1: class ReputMessageService extends ServiceThread {
2:
3: /** 4: * 開始重放消息的CommitLog物理位置 5: */
6: private volatile long reputFromOffset = 0;
7:
8: public long getReputFromOffset() {
9: return reputFromOffset;
10: }
11:
12: public void setReputFromOffset(long reputFromOffset) {
13: this.reputFromOffset = reputFromOffset;
14: }
15:
16: @Override
17: public void shutdown() {
18: for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
19: try {
20: Thread.sleep(100);
21: } catch (InterruptedException ignored) {
22: }
23: }
24:
25: if (this.isCommitLogAvailable()) {
26: log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
27: DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
28: }
29:
30: super.shutdown();
31: }
32:
33: /** 34: * 剩餘須要重放消息字節數 35: * 36: * @return 字節數 37: */
38: public long behind() {
39: return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
40: }
41:
42: /** 43: * 是否commitLog須要重放消息 44: * 45: * @return 是否 46: */
47: private boolean isCommitLogAvailable() {
48: return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
49: }
50:
51: private void doReput() {
52: for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
53:
54: // TODO 疑問:這個是啥
55: if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
56: && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
57: break;
58: }
59:
60: // 獲取從reputFromOffset開始的commitLog對應的MappeFile對應的MappedByteBuffer
61: SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
62: if (result != null) {
63: try {
64: this.reputFromOffset = result.getStartOffset();
65:
66: // 遍歷MappedByteBuffer
67: for (int readSize = 0; readSize < result.getSize() && doNext; ) {
68: // 生成重放消息重放調度請求
69: DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
70: int size = dispatchRequest.getMsgSize(); // 消息長度
71: // 根據請求的結果處理
72: if (dispatchRequest.isSuccess()) { // 讀取成功
73: if (size > 0) { // 讀取Message
74: DefaultMessageStore.this.doDispatch(dispatchRequest);
75: // 通知有新消息
76: if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
77: && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
78: DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
79: dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
80: dispatchRequest.getTagsCode());
81: }
82: // FIXED BUG By shijia
83: this.reputFromOffset += size;
84: readSize += size;
85: // 統計
86: if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
87: DefaultMessageStore.this.storeStatsService
88: .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
89: DefaultMessageStore.this.storeStatsService
90: .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
91: .addAndGet(dispatchRequest.getMsgSize());
92: }
93: } else if (size == 0) { // 讀取到MappedFile文件尾
94: this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
95: readSize = result.getSize();
96: }
97: } else if (!dispatchRequest.isSuccess()) { // 讀取失敗
98: if (size > 0) { // 讀取到Message卻不是Message
99: log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
100: this.reputFromOffset += size;
101: } else { // 讀取到Blank卻不是Blank
102: doNext = false;
103: if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
104: log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
105: this.reputFromOffset);
106:
107: this.reputFromOffset += result.getSize() - readSize;
108: }
109: }
110: }
111: }
112: } finally {
113: result.release();
114: }
115: } else {
116: doNext = false;
117: }
118: }
119: }
120:
121: @Override
122: public void run() {
123: DefaultMessageStore.log.info(this.getServiceName() + " service started");
124:
125: while (!this.isStopped()) {
126: try {
127: Thread.sleep(1);
128: this.doReput();
129: } catch (Exception e) {
130: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
131: }
132: }
133:
134: DefaultMessageStore.log.info(this.getServiceName() + " service end");
135: }
136:
137: @Override
138: public String getServiceName() {
139: return ReputMessageService.class.getSimpleName();
140: }
141:
142: }複製代碼
reputFromOffset
開始的 CommitLog
對應的 MappedFile
對應的 MappedByteBuffer
。MappedByteBuffer
。DispatchRequest
) 。請求裏主要包含一條消息 (Message
) 或者 文件尾 (BLANK
) 的基本信息。Broker
是主節點 && Broker
開啓的是長輪詢,通知消費隊列有新的消息。NotifyMessageArrivingListener
會 調用 PullRequestHoldService#notifyMessageArriving(...)
方法,詳細解析見:PullRequestHoldServiceMessage
,進行調度,生成 ConsumeQueue
和 IndexFile
對應的內容。詳細解析見:Blank
,即文件尾,跳轉指向下一個 MappedFile
。shutdown
時,屢次 sleep(100)
直到 CommitLog
回放到最新位置。恩,若是未回放完,會輸出警告日誌。1: /** 2: * 執行調度請求 3: * 1. 非事務消息 或 事務提交消息 創建 消息位置信息 到 ConsumeQueue 4: * 2. 創建 索引信息 到 IndexFile 5: * 6: * @param req 調度請求 7: */
8: public void doDispatch(DispatchRequest req) {
9: // 非事務消息 或 事務提交消息 創建 消息位置信息 到 ConsumeQueue
10: final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
11: switch (tranType) {
12: case MessageSysFlag.TRANSACTION_NOT_TYPE:
13: case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
14: DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
15: req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
16: break;
17: case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
18: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
19: break;
20: }
21: // 創建 索引信息 到 IndexFile
22: if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
23: DefaultMessageStore.this.indexService.buildIndex(req);
24: }
25: }
26:
27: /** 28: * 創建 消息位置信息 到 ConsumeQueue 29: * 30: * @param topic 主題 31: * @param queueId 隊列編號 32: * @param offset commitLog存儲位置 33: * @param size 消息長度 34: * @param tagsCode 消息tagsCode 35: * @param storeTimestamp 存儲時間 36: * @param logicOffset 隊列位置 37: */
38: public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp, 39: long logicOffset) {
40: ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
41: cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
42: }複製代碼
1: /** 2: * 添加位置信息封裝 3: * 4: * @param offset commitLog存儲位置 5: * @param size 消息長度 6: * @param tagsCode 消息tagsCode 7: * @param storeTimestamp 消息存儲時間 8: * @param logicOffset 隊列位置 9: */
10: public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, 11: long logicOffset) {
12: final int maxRetries = 30;
13: boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
14: // 屢次循環寫,直到成功
15: for (int i = 0; i < maxRetries && canWrite; i++) {
16: // 調用添加位置信息
17: boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
18: if (result) {
19: // 添加成功,使用消息存儲時間 做爲 存儲check point。
20: this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
21: return;
22: } else {
23: // XXX: warn and notify me
24: log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
25: + " failed, retry " + i + " times");
26:
27: try {
28: Thread.sleep(1000);
29: } catch (InterruptedException e) {
30: log.warn("", e);
31: }
32: }
33: }
34:
35: // XXX: warn and notify me 設置異常不可寫入
36: log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
37: this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
38: }
39:
40: /** 41: * 添加位置信息,並返回添加是否成功 42: * 43: * @param offset commitLog存儲位置 44: * @param size 消息長度 45: * @param tagsCode 消息tagsCode 46: * @param cqOffset 隊列位置 47: * @return 是否成功 48: */
49: private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, 50: final long cqOffset) {
51: // 若是已經重放過,直接返回成功
52: if (offset <= this.maxPhysicOffset) {
53: return true;
54: }
55: // 寫入位置信息到byteBuffer
56: this.byteBufferIndex.flip();
57: this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
58: this.byteBufferIndex.putLong(offset);
59: this.byteBufferIndex.putInt(size);
60: this.byteBufferIndex.putLong(tagsCode);
61: // 計算consumeQueue存儲位置,並得到對應的MappedFile
62: final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
63: MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
64: if (mappedFile != null) {
65: // 當是ConsumeQueue第一個MappedFile && 隊列位置非第一個 && MappedFile未寫入內容,則填充前置空白佔位
66: if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑問:爲啥這個操做。目前可以想象到的是,一些老的消息好久沒發送,忽然發送,這個時候恰好知足。
67: this.minLogicOffset = expectLogicOffset;
68: this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
69: this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
70: this.fillPreBlank(mappedFile, expectLogicOffset);
71: log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
72: + mappedFile.getWrotePosition());
73: }
74: // 校驗consumeQueue存儲位置是否合法。TODO 若是不合法,繼續寫入會不會有問題?
75: if (cqOffset != 0) {
76: long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
77: if (expectLogicOffset != currentLogicOffset) {
78: LOG_ERROR.warn(
79: "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
80: expectLogicOffset,
81: currentLogicOffset,
82: this.topic,
83: this.queueId,
84: expectLogicOffset - currentLogicOffset
85: );
86: }
87: }
88: // 設置commitLog重放消息到ConsumeQueue位置。
89: this.maxPhysicOffset = offset;
90: // 插入mappedFile
91: return mappedFile.appendMessage(this.byteBufferIndex.array());
92: }
93: return false;
94: }
95:
96: /** 97: * 填充前置空白佔位 98: * 99: * @param mappedFile MappedFile 100: * @param untilWhere consumeQueue存儲位置 101: */
102: private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
103: // 寫入前置空白佔位到byteBuffer
104: ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
105: byteBuffer.putLong(0L);
106: byteBuffer.putInt(Integer.MAX_VALUE);
107: byteBuffer.putLong(0L);
108: // 循環填空
109: int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
110: for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
111: mappedFile.appendMessage(byteBuffer.array());
112: }
113: }複製代碼
#putMessagePositionInfoWrapper(...)
說明 :添加位置信息到 ConsumeQueue
的封裝,實際須要調用 #putMessagePositionInfo(...)
方法。
ConsumeQueue
是否容許寫入。當發生Bug時,不容許寫入。#putMessagePositionInfo(...)
方法,添加位置信息。StoreCheckpoint
的詳細解析見:Store初始化與關閉。ConsumeQueue
寫入異常,不容許繼續寫入。#putMessagePositionInfo(...)
說明 :添加位置信息到 ConsumeQueue
,並返回添加是否成功。
offset
(存儲位置) 小於等於 maxPhysicOffset
(CommitLog
消息重放到 ConsumeQueue
最大的 CommitLog
存儲位置),表示已經重放過,此時,再也不重複寫入,直接返回寫入成功。ConsumeQueue
存儲位置,並得到對應的MappedFile。MappedFile
是 ConsumeQueue
當前第一個文件 && MappedFile
未寫入內容 && 重放消息隊列位置大於0,則須要進行 MappedFile
填充前置 BLANK
。
Topic
長期無消息產生,忽然N天后進行發送,Topic
對應的歷史消息以及和消費隊列數據已經被清理,新生成的MappedFile
須要前置佔位。ConsumeQueue
存儲位置是否合法,不合法則輸出日誌。
CommitLog
重放消息到 ConsumeQueue
的最大位置。MappedFile
。1: class FlushConsumeQueueService extends ServiceThread {
2: private static final int RETRY_TIMES_OVER = 3;
3: /** 4: * 最後flush時間戳 5: */
6: private long lastFlushTimestamp = 0;
7:
8: private void doFlush(int retryTimes) {
9: int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
10:
11: // retryTimes == RETRY_TIMES_OVER時,進行強制flush。主要用於shutdown時。
12: if (retryTimes == RETRY_TIMES_OVER) {
13: flushConsumeQueueLeastPages = 0;
14: }
15: // 當時間知足flushConsumeQueueThoroughInterval時,即便寫入的數量不足flushConsumeQueueLeastPages,也進行flush
16: long logicsMsgTimestamp = 0;
17: int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
18: long currentTimeMillis = System.currentTimeMillis();
19: if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
20: this.lastFlushTimestamp = currentTimeMillis;
21: flushConsumeQueueLeastPages = 0;
22: logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
23: }
24: // flush消費隊列
25: ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
26: for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
27: for (ConsumeQueue cq : maps.values()) {
28: boolean result = false;
29: for (int i = 0; i < retryTimes && !result; i++) {
30: result = cq.flush(flushConsumeQueueLeastPages);
31: }
32: }
33: }
34: // flush 存儲 check point
35: if (0 == flushConsumeQueueLeastPages) {
36: if (logicsMsgTimestamp > 0) {
37: DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
38: }
39: DefaultMessageStore.this.getStoreCheckpoint().flush();
40: }
41: }
42:
43: public void run() {
44: DefaultMessageStore.log.info(this.getServiceName() + " service started");
45:
46: while (!this.isStopped()) {
47: try {
48: int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
49: this.waitForRunning(interval);
50: this.doFlush(1);
51: } catch (Exception e) {
52: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
53: }
54: }
55:
56: this.doFlush(RETRY_TIMES_OVER);
57:
58: DefaultMessageStore.log.info(this.getServiceName() + " service end");
59: }
60:
61: @Override
62: public String getServiceName() {
63: return FlushConsumeQueueService.class.getSimpleName();
64: }
65:
66: @Override
67: public long getJointime() {
68: return 1000 * 60;
69: }
70: }複製代碼
ConsumeQueue
(消費隊列) 線程服務。retryTimes == RETRY_TIMES_OVER
時,進行強制flush。用於 shutdown
時。ConsumeQueue
(消費隊列)。
StoreCheckpoint
。StoreCheckpoint
的詳細解析見:Store初始化與關閉。flush
。若是 wakeup() 時,則會當即進行一次 flush
。目前,暫時不存在 wakeup() 的調用。1: public class PullMessageRequestHeader implements CommandCustomHeader {
2: /** 3: * 消費者分組 4: */
5: @CFNotNull
6: private String consumerGroup;
7: /** 8: * Topic 9: */
10: @CFNotNull
11: private String topic;
12: /** 13: * 隊列編號 14: */
15: @CFNotNull
16: private Integer queueId;
17: /** 18: * 隊列開始位置 19: */
20: @CFNotNull
21: private Long queueOffset;
22: /** 23: * 消息數量 24: */
25: @CFNotNull
26: private Integer maxMsgNums;
27: /** 28: * 系統標識 29: */
30: @CFNotNull
31: private Integer sysFlag;
32: /** 33: * 提交消費進度位置 34: */
35: @CFNotNull
36: private Long commitOffset;
37: /** 38: * 掛起超時時間 39: */
40: @CFNotNull
41: private Long suspendTimeoutMillis;
42: /** 43: * 訂閱表達式 44: */
45: @CFNullable
46: private String subscription;
47: /** 48: * 訂閱版本號 49: */
50: @CFNotNull
51: private Long subVersion;
52: }複製代碼
FLAG_COMMIT_OFFSET
:標記請求提交消費進度位置,和 commitOffset
配合。FLAG_SUSPEND
:標記請求是否掛起請求,和 suspendTimeoutMillis
配合。當拉取不到消息時, Broker
會掛起請求,直到有消息。最大掛起時間:suspendTimeoutMillis
毫秒。FLAG_SUBSCRIPTION
:是否過濾訂閱表達式,和 subscription
配置。1: private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) 2: throws RemotingCommandException {
3: RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
4: final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
5: final PullMessageRequestHeader requestHeader =
6: (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
7:
8: response.setOpaque(request.getOpaque());
9:
10: if (LOG.isDebugEnabled()) {
11: LOG.debug("receive PullMessage request command, {}", request);
12: }
13:
14: // 校驗 broker 是否可讀
15: if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
16: response.setCode(ResponseCode.NO_PERMISSION);
17: response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
18: return response;
19: }
20:
21: // 校驗 consumer分組配置 是否存在
22: SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
23: if (null == subscriptionGroupConfig) {
24: response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
25: response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
26: return response;
27: }
28: // 校驗 consumer分組配置 是否可消費
29: if (!subscriptionGroupConfig.isConsumeEnable()) {
30: response.setCode(ResponseCode.NO_PERMISSION);
31: response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
32: return response;
33: }
34:
35: final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag()); // 是否掛起請求,當沒有消息時
36: final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag()); // 是否提交消費進度
37: final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag()); // 是否過濾訂閱表達式(subscription)
38: final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0; // 掛起請求超時時長
39:
40: // 校驗 topic配置 存在
41: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
42: if (null == topicConfig) {
43: LOG.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
44: response.setCode(ResponseCode.TOPIC_NOT_EXIST);
45: response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
46: return response;
47: }
48: // 校驗 topic配置 權限可讀
49: if (!PermName.isReadable(topicConfig.getPerm())) {
50: response.setCode(ResponseCode.NO_PERMISSION);
51: response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
52: return response;
53: }
54: // 校驗 讀取隊列 在 topic配置 隊列範圍內
55: if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
56: String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
57: requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
58: LOG.warn(errorInfo);
59: response.setCode(ResponseCode.SYSTEM_ERROR);
60: response.setRemark(errorInfo);
61: return response;
62: }
63:
64: // 校驗 訂閱關係
65: SubscriptionData subscriptionData;
66: if (hasSubscriptionFlag) {
67: try {
68: subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
69: requestHeader.getSubscription());
70: } catch (Exception e) {
71: LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
72: requestHeader.getConsumerGroup());
73: response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
74: response.setRemark("parse the consumer's subscription failed");
75: return response;
76: }
77: } else {
78: // 校驗 消費分組信息 是否存在
79: ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
80: if (null == consumerGroupInfo) {
81: LOG.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
82: response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
83: response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
84: return response;
85: }
86: // 校驗 消費分組信息 消息模型是否匹配
87: if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
88: && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
89: response.setCode(ResponseCode.NO_PERMISSION);
90: response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
91: return response;
92: }
93:
94: // 校驗 訂閱信息 是否存在
95: subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
96: if (null == subscriptionData) {
97: LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
98: response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
99: response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
100: return response;
101: }
102: // 校驗 訂閱信息版本 是否合法
103: if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
104: LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
105: subscriptionData.getSubString());
106: response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
107: response.setRemark("the consumer's subscription not latest");
108: return response;
109: }
110: }
111:
112: // 獲取消息
113: final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
114: requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
115: if (getMessageResult != null) {
116: response.setRemark(getMessageResult.getStatus().name());
117: responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
118: responseHeader.setMinOffset(getMessageResult.getMinOffset());
119: responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
120:
121: // TODO 待讀
122: // 計算建議讀取brokerId
123: if (getMessageResult.isSuggestPullingFromSlave()) {
124: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
125: } else {
126: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
127: }
128:
129: switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
130: case ASYNC_MASTER:
131: case SYNC_MASTER:
132: break;
133: case SLAVE:
134: if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // 從節點不容許讀取,告訴consumer讀取主節點。
135: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
136: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
137: }
138: break;
139: }
140:
141: if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
142: // consume too slow ,redirect to another machine
143: if (getMessageResult.isSuggestPullingFromSlave()) {
144: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
145: }
146: // consume ok
147: else {
148: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
149: }
150: } else {
151: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
152: }
153:
154: switch (getMessageResult.getStatus()) {
155: case FOUND:
156: response.setCode(ResponseCode.SUCCESS);
157: break;
158: case MESSAGE_WAS_REMOVING:
159: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
160: break;
161: case NO_MATCHED_LOGIC_QUEUE:
162: case NO_MESSAGE_IN_QUEUE:
163: if (0 != requestHeader.getQueueOffset()) {
164: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
165:
166: // XXX: warn and notify me
167: LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
168: requestHeader.getQueueOffset(), //
169: getMessageResult.getNextBeginOffset(), //
170: requestHeader.getTopic(), //
171: requestHeader.getQueueId(), //
172: requestHeader.getConsumerGroup()//
173: );
174: } else {
175: response.setCode(ResponseCode.PULL_NOT_FOUND);
176: }
177: break;
178: case NO_MATCHED_MESSAGE:
179: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
180: break;
181: case OFFSET_FOUND_NULL:
182: response.setCode(ResponseCode.PULL_NOT_FOUND);
183: break;
184: case OFFSET_OVERFLOW_BADLY:
185: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
186: // XXX: warn and notify me
187: LOG.info("The request offset:{} over flow badly, broker max offset:{} , consumer: {}", requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
188: break;
189: case OFFSET_OVERFLOW_ONE:
190: response.setCode(ResponseCode.PULL_NOT_FOUND);
191: break;
192: case OFFSET_TOO_SMALL:
193: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
194: LOG.info("The request offset is too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
195: requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
196: getMessageResult.getMinOffset(), channel.remoteAddress());
197: break;
198: default:
199: assert false;
200: break;
201: }
202:
203: // hook:before
204: if (this.hasConsumeMessageHook()) {
205: ConsumeMessageContext context = new ConsumeMessageContext();
206: context.setConsumerGroup(requestHeader.getConsumerGroup());
207: context.setTopic(requestHeader.getTopic());
208: context.setQueueId(requestHeader.getQueueId());
209:
210: String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
211:
212: switch (response.getCode()) {
213: case ResponseCode.SUCCESS:
214: int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
215: int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
216:
217: context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
218: context.setCommercialRcvTimes(incValue);
219: context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
220: context.setCommercialOwner(owner);
221:
222: break;
223: case ResponseCode.PULL_NOT_FOUND:
224: if (!brokerAllowSuspend) {
225:
226: context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
227: context.setCommercialRcvTimes(1);
228: context.setCommercialOwner(owner);
229:
230: }
231: break;
232: case ResponseCode.PULL_RETRY_IMMEDIATELY:
233: case ResponseCode.PULL_OFFSET_MOVED:
234: context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
235: context.setCommercialRcvTimes(1);
236: context.setCommercialOwner(owner);
237: break;
238: default:
239: assert false;
240: break;
241: }
242:
243: this.executeConsumeMessageHookBefore(context);
244: }
245:
246: switch (response.getCode()) {
247: case ResponseCode.SUCCESS:
248:
249: this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
250: getMessageResult.getMessageCount());
251: this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
252: getMessageResult.getBufferTotalSize());
253: this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
254: // 讀取消息
255: if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { // 內存中
256: final long beginTimeMills = this.brokerController.getMessageStore().now();
257:
258: final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
259:
260: this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
261: requestHeader.getTopic(), requestHeader.getQueueId(),
262: (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
263: response.setBody(r);
264: } else { // zero-copy
265: try {
266: FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
267: channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
268: @Override
269: public void operationComplete(ChannelFuture future) throws Exception {
270: getMessageResult.release();
271: if (!future.isSuccess()) {
272: LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
273: }
274: }
275: });
276: } catch (Throwable e) {
277: LOG.error("Error occurred when transferring messages from page cache", e);
278: getMessageResult.release();
279: }
280:
281: response = null;
282: }
283: break;
284: case ResponseCode.PULL_NOT_FOUND:
285: // 消息未查詢到 && broker容許掛起請求 && 請求容許掛起
286: if (brokerAllowSuspend && hasSuspendFlag) {
287: long pollingTimeMills = suspendTimeoutMillisLong;
288: if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
289: pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
290: }
291:
292: String topic = requestHeader.getTopic();
293: long offset = requestHeader.getQueueOffset();
294: int queueId = requestHeader.getQueueId();
295: PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
296: this.brokerController.getMessageStore().now(), offset, subscriptionData);
297: this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
298: response = null;
299: break;
300: }
301:
302: case ResponseCode.PULL_RETRY_IMMEDIATELY:
303: break;
304: case ResponseCode.PULL_OFFSET_MOVED:
305: if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
306: || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { // TODO 待博客補充
307: MessageQueue mq = new MessageQueue();
308: mq.setTopic(requestHeader.getTopic());
309: mq.setQueueId(requestHeader.getQueueId());
310: mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
311:
312: OffsetMovedEvent event = new OffsetMovedEvent();
313: event.setConsumerGroup(requestHeader.getConsumerGroup());
314: event.setMessageQueue(mq);
315: event.setOffsetRequest(requestHeader.getQueueOffset());
316: event.setOffsetNew(getMessageResult.getNextBeginOffset());
317: this.generateOffsetMovedEvent(event);
318: LOG.warn(
319: "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
320: requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
321: responseHeader.getSuggestWhichBrokerId());
322: } else {
323: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
324: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
325: LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
326: requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
327: responseHeader.getSuggestWhichBrokerId());
328: }
329:
330: break;
331: default:
332: assert false;
333: }
334: } else {
335: response.setCode(ResponseCode.SYSTEM_ERROR);
336: response.setRemark("store getMessage return null");
337: }
338:
339: // 請求要求持久化進度 && broker非主,進行持久化進度。
340: boolean storeOffsetEnable = brokerAllowSuspend;
341: storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
342: storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
343: if (storeOffsetEnable) {
344: this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
345: requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
346: }
347: return response;
348: }複製代碼
Broker
是否可讀。SubscriptionGroupConfig
(訂閱分組配置) 是否存在 && 能夠消費。PullMessageRequestHeader.sysFlag
對應的標誌位。TopicConfig
(主題配置) 是否存在 && 可讀 && 隊列編號正確。SubscriptionData
(訂閱信息) 是否正確。MessageStore#getMessage(...)
獲取 GetMessageResult
(消息)。詳細解析見:MessageStore#getMessage(...)。brokerId
。Hook
邏輯,#executeConsumeMessageHookBefore(...)
。readGetMessageResult(...)
獲取消息內容到堆內內存,設置到 響應body
。zero-copy
實現,直接響應,無需堆內內存,性能更優。TODO :此處等對zero-copy有研究,再補充一些。Broker
容許掛起 && 請求要求掛起),執行掛起請求。詳細解析見:PullRequestHoldService。tools
模塊研究後再補充。Broker
非主 && 請求要求持久化進度)。詳細解析見:更新消費進度。1: /** 2: * 獲取消息結果 3: * 4: * @param group 消費分組 5: * @param topic 主題 6: * @param queueId 隊列編號 7: * @param offset 隊列位置 8: * @param maxMsgNums 消息數量 9: * @param subscriptionData 訂閱信息 10: * @return 消息結果 11: */
12: public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, 13: final SubscriptionData subscriptionData) {
14: // 是否關閉
15: if (this.shutdown) {
16: log.warn("message store has shutdown, so getMessage is forbidden");
17: return null;
18: }
19: // 是否可讀
20: if (!this.runningFlags.isReadable()) {
21: log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
22: return null;
23: }
24:
25: long beginTime = this.getSystemClock().now();
26:
27: GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
28: long nextBeginOffset = offset;
29: long minOffset = 0;
30: long maxOffset = 0;
31:
32: GetMessageResult getResult = new GetMessageResult();
33:
34: final long maxOffsetPy = this.commitLog.getMaxOffset();
35:
36: // 獲取消費隊列
37: ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
38: if (consumeQueue != null) {
39: minOffset = consumeQueue.getMinOffsetInQueue(); // 消費隊列 最小隊列編號
40: maxOffset = consumeQueue.getMaxOffsetInQueue(); // 消費隊列 最大隊列編號
41:
42: // 判斷 隊列位置(offset)
43: if (maxOffset == 0) { // 消費隊列無消息
44: status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
45: nextBeginOffset = nextOffsetCorrection(offset, 0);
46: } else if (offset < minOffset) { // 查詢offset 過小
47: status = GetMessageStatus.OFFSET_TOO_SMALL;
48: nextBeginOffset = nextOffsetCorrection(offset, minOffset);
49: } else if (offset == maxOffset) { // 查詢offset 超過 消費隊列 一個位置
50: status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
51: nextBeginOffset = nextOffsetCorrection(offset, offset);
52: } else if (offset > maxOffset) { // 查詢offset 超過 消費隊列 太多(大於一個位置)
53: status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
54: if (0 == minOffset) { // TODO blog 這裏是??爲啥0 == minOffset作了特殊判斷
55: nextBeginOffset = nextOffsetCorrection(offset, minOffset);
56: } else {
57: nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
58: }
59: } else {
60: // 得到 映射Buffer結果(MappedFile)
61: SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
62: if (bufferConsumeQueue != null) {
63: try {
64: status = GetMessageStatus.NO_MATCHED_MESSAGE;
65:
66: long nextPhyFileStartOffset = Long.MIN_VALUE; // commitLog下一個文件(MappedFile)對應的開始offset。
67: long maxPhyOffsetPulling = 0; // 消息物理位置拉取到的最大offset
68:
69: int i = 0;
70: final int maxFilterMessageCount = 16000;
71: final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
72: // 循環獲取 消息位置信息
73: for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
74: long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // 消息物理位置offset
75: int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // 消息長度
76: long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // 消息tagsCode
77: // 設置消息物理位置拉取到的最大offset
78: maxPhyOffsetPulling = offsetPy;
79: // 當 offsetPy 小於 nextPhyFileStartOffset 時,意味着對應的 Message 已經移除,因此直接continue,直到可讀取的Message。
80: if (nextPhyFileStartOffset != Long.MIN_VALUE) {
81: if (offsetPy < nextPhyFileStartOffset)
82: continue;
83: }
84: // 校驗 commitLog 是否須要硬盤,沒法所有放在內存
85: boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
86: // 是否已經得到足夠消息
87: if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
88: isInDisk)) {
89: break;
90: }
91: // 判斷消息是否符合條件
92: if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
93: // 從commitLog獲取對應消息ByteBuffer
94: SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
95: if (selectResult != null) {
96: this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
97: getResult.addMessage(selectResult);
98: status = GetMessageStatus.FOUND;
99: nextPhyFileStartOffset = Long.MIN_VALUE;
100: } else {
101: // 從commitLog沒法讀取到消息,說明該消息對應的文件(MappedFile)已經刪除,計算下一個MappedFile的起始位置
102: if (getResult.getBufferTotalSize() == 0) {
103: status = GetMessageStatus.MESSAGE_WAS_REMOVING;
104: }
105: nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
106: }
107: } else {
108: if (getResult.getBufferTotalSize() == 0) {
109: status = GetMessageStatus.NO_MATCHED_MESSAGE;
110: }
111:
112: if (log.isDebugEnabled()) {
113: log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode);
114: }
115: }
116: }
117: // 統計剩餘可拉取消息字節數
118: if (diskFallRecorded) {
119: long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
120: brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
121: }
122: // 計算下次拉取消息的消息隊列編號
123: nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
124: // 根據剩餘可拉取消息字節數與內存判斷是否建議讀取從節點
125: long diff = maxOffsetPy - maxPhyOffsetPulling;
126: long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
127: * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
128: getResult.setSuggestPullingFromSlave(diff > memory);
129: } finally {
130: bufferConsumeQueue.release();
131: }
132: } else {
133: status = GetMessageStatus.OFFSET_FOUND_NULL;
134: nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
135: log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
136: + maxOffset + ", but access logic queue failed.");
137: }
138: }
139: } else {
140: status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
141: nextBeginOffset = nextOffsetCorrection(offset, 0);
142: }
143: // 統計
144: if (GetMessageStatus.FOUND == status) {
145: this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
146: } else {
147: this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
148: }
149: long eclipseTime = this.getSystemClock().now() - beginTime;
150: this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
151: // 設置返回結果
152: getResult.setStatus(status);
153: getResult.setNextBeginOffset(nextBeginOffset);
154: getResult.setMaxOffset(maxOffset);
155: getResult.setMinOffset(minOffset);
156: return getResult;
157: }
158:
159: /** 160: * 根據 主題 + 隊列編號 獲取 消費隊列 161: * 162: * @param topic 主題 163: * @param queueId 隊列編號 164: * @return 消費隊列 165: */
166: public ConsumeQueue findConsumeQueue(String topic, int queueId) {
167: // 獲取 topic 對應的 全部消費隊列
168: ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
169: if (null == map) {
170: ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<>(128);
171: ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
172: if (oldMap != null) {
173: map = oldMap;
174: } else {
175: map = newMap;
176: }
177: }
178: // 獲取 queueId 對應的 消費隊列
179: ConsumeQueue logic = map.get(queueId);
180: if (null == logic) {
181: ConsumeQueue newLogic = new ConsumeQueue(//
182: topic, //
183: queueId, //
184: StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
185: this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
186: this);
187: ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
188: if (oldLogic != null) {
189: logic = oldLogic;
190: } else {
191: logic = newLogic;
192: }
193: }
194:
195: return logic;
196: }
197:
198: /** 199: * 下一個獲取隊列offset修正 200: * 修正條件:主節點 或者 從節點開啓校驗offset開關 201: * 202: * @param oldOffset 老隊列offset 203: * @param newOffset 新隊列offset 204: * @return 修正後的隊列offset 205: */
206: private long nextOffsetCorrection(long oldOffset, long newOffset) {
207: long nextOffset = oldOffset;
208: if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {
209: nextOffset = newOffset;
210: }
211: return nextOffset;
212: }
213:
214: /** 215: * 校驗 commitLog 是否須要硬盤,沒法所有放在內存 216: * 217: * @param offsetPy commitLog 指定offset 218: * @param maxOffsetPy commitLog 最大offset 219: * @return 是否須要硬盤 220: */
221: private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
222: long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
223: return (maxOffsetPy - offsetPy) > memory;
224: }
225:
226: /** 227: * 判斷獲取消息是否已經滿 228: * 229: * @param sizePy 字節數 230: * @param maxMsgNums 最大消息數 231: * @param bufferTotal 目前已經計算字節數 232: * @param messageTotal 目前已經計算消息數 233: * @param isInDisk 是否在硬盤中 234: * @return 是否已滿 235: */
236: private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {
237: if (0 == bufferTotal || 0 == messageTotal) {
238: return false;
239: }
240: // 消息數量已經知足請求數量(maxMsgNums)
241: if ((messageTotal + 1) >= maxMsgNums) {
242: return true;
243: }
244: // 根據消息存儲配置的最大傳輸字節數、最大傳輸消息數是否已滿
245: if (isInDisk) {
246: if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
247: return true;
248: }
249:
250: if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
251: return true;
252: }
253: } else {
254: if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
255: return true;
256: }
257:
258: if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
259: return true;
260: }
261: }
262:
263: return false;
264: }複製代碼
group
) + 主題(Topic
) + 隊列編號(queueId
) + 隊列位置(offset
) + 訂閱信息(subscriptionData
) 獲取 指定條數(maxMsgNums
) 消息(Message
)。Store
是否處於關閉狀態,若關閉,則沒法獲取消息。Topic
) + 隊列編號(queueId
) 獲取 消息隊列(ConsumeQueue
)。
#findConsumeQueue(...)
:第 159 至 196 行。offset
) 沒法讀取消息,並針對對應的狀況,計算下一次 Client
隊列拉取位置。
offset
) 過小。offset
) 剛好等於 消息隊列最大的隊列位置。該狀況是正常現象,至關於查詢最新的消息。offset
) 超過過多。#nextOffsetCorrection(...)
:第 198 至 212 行。offset
) 獲取 對應的MappedFile
。消息位置信息
。
消息位置信息
。offsetPy
小於 nextPhyFileStartOffset
時,意味着對Message
已經移除,因此直接continue,直到可讀取的 Message
。#checkInDiskByCommitOffset(...)
:第 214 至 224 行。#isTheBatchFull(...)
:第 226 至 264 行。CommitLog
獲取對應 消息的MappedByteBuffer
。消息MappedByteBuffer
成功。消息MappedByteBuffer
失敗。從 CommitLog
沒法讀取到消息,說明 該消息對應的文件(MappedFile
) 已經刪除,此時計算下一個MappedFile
的起始位置。該邏輯須要配合(第 79 至 83 行)一塊兒理解。bufferConsumeQueue
對 MappedFile
的指向。此處 MappedFile
是 ConsumeQueue
裏的文件,不是 CommitLog
下的文件。offset
) 獲取 對應的MappedFile
爲空,計算ConsumeQueue
從 offset
開始的下一個 MappedFile
對應的位置。1: public class DefaultMessageFilter implements MessageFilter {
2:
3: @Override
4: public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
5: // 消息tagsCode 空
6: if (tagsCode == null) {
7: return true;
8: }
9: // 訂閱數據 空
10: if (null == subscriptionData) {
11: return true;
12: }
13: // classFilter
14: if (subscriptionData.isClassFilterMode())
15: return true;
16: // 訂閱表達式 全匹配
17: if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
18: return true;
19: }
20: // 訂閱數據code數組 是否包含 消息tagsCode
21: return subscriptionData.getCodeSet().contains(tagsCode.intValue());
22: }
23:
24: }複製代碼
1: public class PullRequestHoldService extends ServiceThread {
2:
3: private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
4:
5: private static final String TOPIC_QUEUEID_SEPARATOR = "@";
6:
7: private final BrokerController brokerController;
8:
9: private final SystemClock systemClock = new SystemClock();
10: /** 11: * 消息過濾器 12: */
13: private final MessageFilter messageFilter = new DefaultMessageFilter();
14: /** 15: * 拉取消息請求集合 16: */
17: private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
18: new ConcurrentHashMap<>(1024);
19:
20: public PullRequestHoldService(final BrokerController brokerController) {
21: this.brokerController = brokerController;
22: }
23:
24: /** 25: * 添加拉取消息掛起請求 26: * 27: * @param topic 主題 28: * @param queueId 隊列編號 29: * @param pullRequest 拉取消息請求 30: */
31: public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
32: String key = this.buildKey(topic, queueId);
33: ManyPullRequest mpr = this.pullRequestTable.get(key);
34: if (null == mpr) {
35: mpr = new ManyPullRequest();
36: ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
37: if (prev != null) {
38: mpr = prev;
39: }
40: }
41:
42: mpr.addPullRequest(pullRequest);
43: }
44:
45: /** 46: * 根據 主題 + 隊列編號 建立惟一標識 47: * 48: * @param topic 主題 49: * @param queueId 隊列編號 50: * @return key 51: */
52: private String buildKey(final String topic, final int queueId) {
53: StringBuilder sb = new StringBuilder();
54: sb.append(topic);
55: sb.append(TOPIC_QUEUEID_SEPARATOR);
56: sb.append(queueId);
57: return sb.toString();
58: }
59:
60: @Override
61: public void run() {
62: log.info("{} service started", this.getServiceName());
63: while (!this.isStopped()) {
64: try {
65: // 根據 長輪訓 仍是 短輪訓 設置不一樣的等待時間
66: if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
67: this.waitForRunning(5 * 1000);
68: } else {
69: this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
70: }
71: // 檢查掛起請求是否有須要通知的
72: long beginLockTimestamp = this.systemClock.now();
73: this.checkHoldRequest();
74: long costTime = this.systemClock.now() - beginLockTimestamp;
75: if (costTime > 5 * 1000) {
76: log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
77: }
78: } catch (Throwable e) {
79: log.warn(this.getServiceName() + " service has exception. ", e);
80: }
81: }
82:
83: log.info("{} service end", this.getServiceName());
84: }
85:
86: @Override
87: public String getServiceName() {
88: return PullRequestHoldService.class.getSimpleName();
89: }
90:
91: /** 92: * 遍歷掛起請求,檢查是否有須要通知的請求。 93: */
94: private void checkHoldRequest() {
95: for (String key : this.pullRequestTable.keySet()) {
96: String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
97: if (2 == kArray.length) {
98: String topic = kArray[0];
99: int queueId = Integer.parseInt(kArray[1]);
100: final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
101: try {
102: this.notifyMessageArriving(topic, queueId, offset);
103: } catch (Throwable e) {
104: log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
105: }
106: }
107: }
108: }
109:
110: /** 111: * 檢查是否有須要通知的請求 112: * 113: * @param topic 主題 114: * @param queueId 隊列編號 115: * @param maxOffset 消費隊列最大offset 116: */
117: public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
118: notifyMessageArriving(topic, queueId, maxOffset, null);
119: }
120:
121: /** 122: * 檢查是否有須要通知的請求 123: * 124: * @param topic 主題 125: * @param queueId 隊列編號 126: * @param maxOffset 消費隊列最大offset 127: * @param tagsCode 過濾tagsCode 128: */
129: public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
130: String key = this.buildKey(topic, queueId);
131: ManyPullRequest mpr = this.pullRequestTable.get(key);
132: if (mpr != null) {
133: //
134: List<PullRequest> requestList = mpr.cloneListAndClear();
135: if (requestList != null) {
136: List<PullRequest> replayList = new ArrayList<>(); // 不符合喚醒的請求數組
137:
138: for (PullRequest request : requestList) {
139: // 若是 maxOffset 太小,則從新讀取一次。
140: long newestOffset = maxOffset;
141: if (newestOffset <= request.getPullFromThisOffset()) {
142: newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
143: }
144: // 有新的匹配消息,喚醒請求,即再次拉取消息。
145: if (newestOffset > request.getPullFromThisOffset()) {
146: if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {
147: try {
148: this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
149: request.getRequestCommand());
150: } catch (Throwable e) {
151: log.error("execute request when wakeup failed.", e);
152: }
153: continue;
154: }
155: }
156: // 超過掛起時間,喚醒請求,即再次拉取消息。
157: if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
158: try {
159: this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
160: request.getRequestCommand());
161: } catch (Throwable e) {
162: log.error("execute request when wakeup failed.", e);
163: }
164: continue;
165: }
166: // 不符合再次拉取的請求,再次添加回去
167: replayList.add(request);
168: }
169: // 添加回去
170: if (!replayList.isEmpty()) {
171: mpr.addPullRequest(replayList);
172: }
173: }
174: }
175: }
176: }複製代碼
PullRequestHoldService
說明 :拉取消息請求掛起維護線程服務。
#suspendPullRequest(...)
說明 :添加拉取消息掛起請求到集合( pullRequestTable
)。#run(...)
說明 :定時檢查掛起請求是否有須要通知從新拉取消息並進行通知。
長輪訓
or短輪訓
設置不一樣的等待時間。#checkHoldRequest(...)
說明 :遍歷掛起請求,檢查是否有須要通知的。#notifyMessageArriving(...)
說明 :檢查指定隊列是否有須要通知的請求。
maxOffset
太小,從新獲取一次最新的。#executeRequestWhenWakeup(...)
,實際是丟到線程池進行一步的消息拉取,不會有性能上的問題。詳細解析見:PullMessageProcessor#executeRequestWhenWakeup(...)。pullRequestTable
)。1: public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
2: Runnable run = new Runnable() {
3: @Override
4: public void run() {
5: try {
6: // 調用拉取請求。本次調用,設置不掛起請求。
7: final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
8:
9: if (response != null) {
10: response.setOpaque(request.getOpaque());
11: response.markResponseType();
12: try {
13: channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
14: @Override
15: public void operationComplete(ChannelFuture future) throws Exception {
16: if (!future.isSuccess()) {
17: LOG.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
18: LOG.error(request.toString());
19: LOG.error(response.toString());
20: }
21: }
22: });
23: } catch (Throwable e) {
24: LOG.error("ProcessRequestWrapper process request over, but response failed", e);
25: LOG.error(request.toString());
26: LOG.error(response.toString());
27: }
28: }
29: } catch (RemotingCommandException e1) {
30: LOG.error("ExecuteRequestWhenWakeup run", e1);
31: }
32: }
33: };
34: // 提交拉取請求到線程池
35: this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
36: }複製代碼
Broker
無限循環。Yunai-MacdeMacBook-Pro-2:config yunai$ pwd
/Users/yunai/store/config
Yunai-MacdeMacBook-Pro-2:config yunai$ ls -ls
total 40
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json.bak
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json.bak
8 -rw-r--r-- 1 yunai staff 1401 4 27 21:51 topics.json
Yunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json
{
"offsetTable":{
"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
},
"TopicRead3@please_rename_unique_group_name_4":{1:5
}
}
}複製代碼
consumerOffset.json
:消費進度存儲文件。consumerOffset.json.bak
:消費進度存儲文件備份。consumerOffset.json
,將原內容備份到 consumerOffset.json.bak
。實現見:MixAll#string2File(...)。1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
2: @Override
3: public void run() {
4: try {
5: BrokerController.this.consumerOffsetManager.persist();
6: } catch (Throwable e) {
7: log.error("schedule persist consumerOffset error.", e);
8: }
9: }
10:}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);複製代碼
1: public abstract class ConfigManager {
2: private static final Logger PLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
3:
4: /** 5: * 編碼內容 6: * @return 編碼後的內容 7: */
8: public abstract String encode();
9:
10: /** 11: * 加載文件 12: * 13: * @return 加載是否成功 14: */
15: public boolean load() {
16: String fileName = null;
17: try {
18: fileName = this.configFilePath();
19: String jsonString = MixAll.file2String(fileName);
20: // 若是內容不存在,則加載備份文件
21: if (null == jsonString || jsonString.length() == 0) {
22: return this.loadBak();
23: } else {
24: this.decode(jsonString);
25: PLOG.info("load {} OK", fileName);
26: return true;
27: }
28: } catch (Exception e) {
29: PLOG.error("load " + fileName + " Failed, and try to load backup file", e);
30: return this.loadBak();
31: }
32: }
33:
34: /** 35: * 配置文件地址 36: * 37: * @return 配置文件地址 38: */
39: public abstract String configFilePath();
40:
41: /** 42: * 加載備份文件 43: * 44: * @return 是否成功 45: */
46: private boolean loadBak() {
47: String fileName = null;
48: try {
49: fileName = this.configFilePath();
50: String jsonString = MixAll.file2String(fileName + ".bak");
51: if (jsonString != null && jsonString.length() > 0) {
52: this.decode(jsonString);
53: PLOG.info("load " + fileName + " OK");
54: return true;
55: }
56: } catch (Exception e) {
57: PLOG.error("load " + fileName + " Failed", e);
58: return false;
59: }
60:
61: return true;
62: }
63:
64: /** 65: * 解碼內容 66: * 67: * @param jsonString 內容 68: */
69: public abstract void decode(final String jsonString);
70:
71: /** 72: * 持久化 73: */
74: public synchronized void persist() {
75: String jsonString = this.encode(true);
76: if (jsonString != null) {
77: String fileName = this.configFilePath();
78: try {
79: MixAll.string2File(jsonString, fileName);
80: } catch (IOException e) {
81: PLOG.error("persist file Exception, " + fileName, e);
82: }
83: }
84: }
85:
86: /** 87: * 編碼存儲內容 88: * 89: * @param prettyFormat 是否格式化 90: * @return 內容 91: */
92: public abstract String encode(final boolean prettyFormat);
93: }複製代碼
1: /** 2: * 將內容寫到文件 3: * 安全寫 4: * 1. 寫到.tmp文件 5: * 2. 備份準備寫入文件到.bak文件 6: * 3. 刪除原文件,將.tmp修改爲文件 7: * 8: * @param str 內容 9: * @param fileName 文件名 10: * @throws IOException 當IO發生異常時 11: */
12: public static void string2File(final String str, final String fileName) throws IOException {
13: // 寫到 tmp文件
14: String tmpFile = fileName + ".tmp";
15: string2FileNotSafe(str, tmpFile);
16: //
17: String bakFile = fileName + ".bak";
18: String prevContent = file2String(fileName);
19: if (prevContent != null) {
20: string2FileNotSafe(prevContent, bakFile);
21: }
22:
23: File file = new File(fileName);
24: file.delete();
25:
26: file = new File(tmpFile);
27: file.renameTo(new File(fileName));
28: }
29:
30: /** 31: * 將內容寫到文件 32: * 非安全寫 33: * 34: * @param str 內容 35: * @param fileName 文件內容 36: * @throws IOException 當IO發生異常時 37: */
38: public static void string2FileNotSafe(final String str, final String fileName) throws IOException {
39: File file = new File(fileName);
40: // 建立上級目錄
41: File fileParent = file.getParentFile();
42: if (fileParent != null) {
43: fileParent.mkdirs();
44: }
45: // 寫內容
46: FileWriter fileWriter = null;
47: try {
48: fileWriter = new FileWriter(file);
49: fileWriter.write(str);
50: } catch (IOException e) {
51: throw e;
52: } finally {
53: if (fileWriter != null) {
54: fileWriter.close();
55: }
56: }
57: }複製代碼
1: public class ConsumerOffsetManager extends ConfigManager {
2: private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
3: private static final String TOPIC_GROUP_SEPARATOR = "@";
4:
5: /** 6: * 消費進度集合 7: */
8: private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>(512);
9:
10: private transient BrokerController brokerController;
11:
12: public ConsumerOffsetManager() {
13: }
14:
15: public ConsumerOffsetManager(BrokerController brokerController) {
16: this.brokerController = brokerController;
17: }
18:
19: /** 20: * 提交消費進度 21: * 22: * @param clientHost 提交client地址 23: * @param group 消費分組 24: * @param topic 主題 25: * @param queueId 隊列編號 26: * @param offset 進度(隊列位置) 27: */
28: public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
29: // topic@group
30: String key = topic + TOPIC_GROUP_SEPARATOR + group;
31: this.commitOffset(clientHost, key, queueId, offset);
32: }
33:
34: /** 35: * 提交消費進度 36: * 37: * @param clientHost 提交client地址 38: * @param key 主題@消費分組 39: * @param queueId 隊列編號 40: * @param offset 進度(隊列位置) 41: */
42: private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
43: ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
44: if (null == map) {
45: map = new ConcurrentHashMap<>(32);
46: map.put(queueId, offset);
47: this.offsetTable.put(key, map);
48: } else {
49: Long storeOffset = map.put(queueId, offset);
50: if (storeOffset != null && offset < storeOffset) {
51: log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
52: }
53: }
54: }
55:
56: public String encode() {
57: return this.encode(false);
58: }
59:
60: @Override
61: public String configFilePath() {
62: return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
63: }
64:
65: /** 66: * 解碼內容 67: * 格式:JSON 68: * 69: * @param jsonString 內容 70: */
71: @Override
72: public void decode(String jsonString) {
73: if (jsonString != null) {
74: ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
75: if (obj != null) {
76: this.offsetTable = obj.offsetTable;
77: }
78: }
79: }
80:
81: /** 82: * 編碼內容 83: * 格式爲JSON 84: * 85: * @param prettyFormat 是否格式化 86: * @return 編碼後的內容 87: */
88: public String encode(final boolean prettyFormat) {
89: return RemotingSerializable.toJson(this, prettyFormat);
90: }
91:
92: }複製代碼
大部分邏輯和 Broker
提供[接收消息]接口 相似,能夠先看下相關內容。
1: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) 2: throws RemotingCommandException {
3:
4: // 初始化響應
5: final RemotingCommand response = RemotingCommand.createResponseCommand(null);
6: final ConsumerSendMsgBackRequestHeader requestHeader =
7: (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
8:
9: // hook(獨有)
10: if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
11:
12: ConsumeMessageContext context = new ConsumeMessageContext();
13: context.setConsumerGroup(requestHeader.getGroup());
14: context.setTopic(requestHeader.getOriginTopic());
15: context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
16: context.setCommercialRcvTimes(1);
17: context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
18:
19: this.executeConsumeMessageHookAfter(context);
20: }
21:
22: // 判斷消費分組是否存在(獨有)
23: SubscriptionGroupConfig subscriptionGroupConfig =
24: this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
25: if (null == subscriptionGroupConfig) {
26: response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
27: response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
28: + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
29: return response;
30: }
31:
32: // 檢查 broker 是否有寫入權限
33: if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
34: response.setCode(ResponseCode.NO_PERMISSION);
35: response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
36: return response;
37: }
38:
39: // 檢查 重試隊列數 是否大於0(獨有)
40: if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
41: response.setCode(ResponseCode.SUCCESS);
42: response.setRemark(null);
43: return response;
44: }
45:
46: // 計算retry Topic
47: String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
48:
49: // 計算隊列編號(獨有)
50: int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
51:
52: // 計算sysFlag(獨有)
53: int topicSysFlag = 0;
54: if (requestHeader.isUnitMode()) {
55: topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
56: }
57:
58: // 獲取topicConfig。若是獲取不到,則進行建立
59: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
60: newTopic, //
61: subscriptionGroupConfig.getRetryQueueNums(), //
62: PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
63: if (null == topicConfig) { // 沒有配置
64: response.setCode(ResponseCode.SYSTEM_ERROR);
65: response.setRemark("topic[" + newTopic + "] not exist");
66: return response;
67: }
68: if (!PermName.isWriteable(topicConfig.getPerm())) { // 不容許寫入
69: response.setCode(ResponseCode.NO_PERMISSION);
70: response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
71: return response;
72: }
73:
74: // 查詢消息。若不存在,返回異常錯誤。(獨有)
75: MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
76: if (null == msgExt) {
77: response.setCode(ResponseCode.SYSTEM_ERROR);
78: response.setRemark("look message by offset failed, " + requestHeader.getOffset());
79: return response;
80: }
81:
82: // 設置retryTopic到拓展屬性(獨有)
83: final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
84: if (null == retryTopic) {
85: MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
86: }
87:
88: // 設置消息不等待存儲完成(獨有) TODO 疑問:若是設置成不等待存儲,broker設置成同步落盤,豈不是不能批量提交了?
89: msgExt.setWaitStoreMsgOK(false);
90:
91: // 處理 delayLevel(獨有)。
92: int delayLevel = requestHeader.getDelayLevel();
93: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
94: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
95: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
96: }
97: if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
98: || delayLevel < 0) { // 若是超過最大消費次數,則topic修改爲"%DLQ%" + 分組名,即加入 死信隊列(Dead Letter Queue)
99: newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
100: queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
101:
102: topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
103: DLQ_NUMS_PER_GROUP, //
104: PermName.PERM_WRITE, 0
105: );
106: if (null == topicConfig) {
107: response.setCode(ResponseCode.SYSTEM_ERROR);
108: response.setRemark("topic[" + newTopic + "] not exist");
109: return response;
110: }
111: } else {
112: if (0 == delayLevel) {
113: delayLevel = 3 + msgExt.getReconsumeTimes();
114: }
115: msgExt.setDelayTimeLevel(delayLevel);
116: }
117:
118: // 建立MessageExtBrokerInner
119: MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
120: msgInner.setTopic(newTopic);
121: msgInner.setBody(msgExt.getBody());
122: msgInner.setFlag(msgExt.getFlag());
123: MessageAccessor.setProperties(msgInner, msgExt.getProperties());
124: msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
125: msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
126: msgInner.setQueueId(queueIdInt);
127: msgInner.setSysFlag(msgExt.getSysFlag());
128: msgInner.setBornTimestamp(msgExt.getBornTimestamp());
129: msgInner.setBornHost(msgExt.getBornHost());
130: msgInner.setStoreHost(this.getStoreHost());
131: msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
132:
133: // 設置原始消息編號到拓展字段(獨有)
134: String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
135: MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
136:
137: // 添加消息
138: PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
139: if (putMessageResult != null) {
140: switch (putMessageResult.getPutMessageStatus()) {
141: case PUT_OK:
142: String backTopic = msgExt.getTopic();
143: String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
144: if (correctTopic != null) {
145: backTopic = correctTopic;
146: }
147:
148: this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
149:
150: response.setCode(ResponseCode.SUCCESS);
151: response.setRemark(null);
152:
153: return response;
154: default:
155: break;
156: }
157:
158: response.setCode(ResponseCode.SYSTEM_ERROR);
159: response.setRemark(putMessageResult.getPutMessageStatus().name());
160: return response;
161: }
162:
163: response.setCode(ResponseCode.SYSTEM_ERROR);
164: response.setRemark("putMessageResult is null");
165: return response;
166: }複製代碼
Consumer
消費某條消息失敗時,會調用該接口發回消息。Broker
會存儲發回的消息。這樣,下次 Consumer
拉取該消息,可以從 CommitLog
和 ConsumeQueue
順序讀取。Broker
接收普通消息 很類似,時候 TODO
標記成獨有的邏輯。Broker
是否有寫入權限。retryQueueNums
。sysFlag
。TopicConfig
。若是不存在,則建立。retryTopic
到消息拓展屬性。Broker
刷盤方式爲同步,會致使同步落盤不能批量提交,這樣會不會存在問題?有知道的同窗麻煩告知下。😈。delayLevel
。MessageExtBrokerInner
。感謝同窗們對本文的閱讀、收藏、點贊。
😈若是解析存在問題或者表達誤解的,表示抱歉。若是方便的話,能夠加下 QQ:7685413。讓咱們來一場 1 :1 交流(搞基)。
再次表示十分感謝。