🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: 後端
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
Producer
發送消息。主要是同步發送消息源碼,涉及到 異步/Oneway發送消息,事務消息會跳過。Broker
接收消息。(存儲消息在《RocketMQ 源碼分析 —— Message 存儲》解析)![]()
Producer發送消息全局順序圖
![]()
Producer發送消息順序圖
1: @Override
2: public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
3: return this.defaultMQProducerImpl.send(msg);
4: }複製代碼
DefaultMQProducer#send(Message)
對 DefaultMQProducerImpl#send(Message)
進行封裝。 1: public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
2: return send(msg, this.defaultMQProducer.getSendMsgTimeout());
3: }
4:
5: public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
6: return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
7: }
8:
9: private SendResult sendDefaultImpl(// 10: Message msg, // 11: final CommunicationMode communicationMode, // 12: final SendCallback sendCallback, // 13: final long timeout// 14: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
15: // 校驗 Producer 處於運行狀態
16: this.makeSureStateOK();
17: // 校驗消息格式
18: Validators.checkMessage(msg, this.defaultMQProducer);
19: //
20: final long invokeID = random.nextLong(); // 調用編號;用於下面打印日誌,標記爲同一次發送消息
21: long beginTimestampFirst = System.currentTimeMillis();
22: long beginTimestampPrev = beginTimestampFirst;
23: long endTimestamp = beginTimestampFirst;
24: // 獲取 Topic路由信息
25: TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
26: if (topicPublishInfo != null && topicPublishInfo.ok()) {
27: MessageQueue mq = null; // 最後選擇消息要發送到的隊列
28: Exception exception = null;
29: SendResult sendResult = null; // 最後一次發送結果
30: int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步屢次調用
31: int times = 0; // 第幾回發送
32: String[] brokersSent = new String[timesTotal]; // 存儲每次發送消息選擇的broker名
33: // 循環調用發送消息,直到成功
34: for (; times < timesTotal; times++) {
35: String lastBrokerName = null == mq ? null : mq.getBrokerName();
36: MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 選擇消息要發送到的隊列
37: if (tmpmq != null) {
38: mq = tmpmq;
39: brokersSent[times] = mq.getBrokerName();
40: try {
41: beginTimestampPrev = System.currentTimeMillis();
42: // 調用發送消息核心方法
43: sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
44: endTimestamp = System.currentTimeMillis();
45: // 更新Broker可用性信息
46: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
47: switch (communicationMode) {
48: case ASYNC:
49: return null;
50: case ONEWAY:
51: return null;
52: case SYNC:
53: if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
54: if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步發送成功但存儲有問題時 && 配置存儲異常時從新發送開關 時,進行重試
55: continue;
56: }
57: }
58: return sendResult;
59: default:
60: break;
61: }
62: } catch (RemotingException e) { // 打印異常,更新Broker可用性信息,更新繼續循環
63: endTimestamp = System.currentTimeMillis();
64: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
65: log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
66: log.warn(msg.toString());
67: exception = e;
68: continue;
69: } catch (MQClientException e) { // 打印異常,更新Broker可用性信息,繼續循環
70: endTimestamp = System.currentTimeMillis();
71: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
72: log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
73: log.warn(msg.toString());
74: exception = e;
75: continue;
76: } catch (MQBrokerException e) { // 打印異常,更新Broker可用性信息,部分狀況下的異常,直接返回,結束循環
77: endTimestamp = System.currentTimeMillis();
78: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
79: log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
80: log.warn(msg.toString());
81: exception = e;
82: switch (e.getResponseCode()) {
83: // 以下異常continue,進行發送消息重試
84: case ResponseCode.TOPIC_NOT_EXIST:
85: case ResponseCode.SERVICE_NOT_AVAILABLE:
86: case ResponseCode.SYSTEM_ERROR:
87: case ResponseCode.NO_PERMISSION:
88: case ResponseCode.NO_BUYER_ID:
89: case ResponseCode.NOT_IN_CURRENT_UNIT:
90: continue;
91: // 若是有發送結果,進行返回,不然,拋出異常;
92: default:
93: if (sendResult != null) {
94: return sendResult;
95: }
96: throw e;
97: }
98: } catch (InterruptedException e) {
99: endTimestamp = System.currentTimeMillis();
100: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
101: log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
102: log.warn(msg.toString());
103: throw e;
104: }
105: } else {
106: break;
107: }
108: }
109: // 返回發送結果
110: if (sendResult != null) {
111: return sendResult;
112: }
113: // 根據不一樣狀況,拋出不一樣的異常
114: String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst,
115: msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
116: MQClientException mqClientException = new MQClientException(info, exception);
117: if (exception instanceof MQBrokerException) {
118: mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
119: } else if (exception instanceof RemotingConnectException) {
120: mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
121: } else if (exception instanceof RemotingTimeoutException) {
122: mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
123: } else if (exception instanceof MQClientException) {
124: mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
125: }
126: throw mqClientException;
127: }
128: // Namesrv找不到異常
129: List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
130: if (null == nsList || nsList.isEmpty()) {
131: throw new MQClientException(
132: "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
133: }
134: // 消息路由找不到異常
135: throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
136: null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
137: }複製代碼
sendsendDefaultImpl(...)
進行封裝。invokeID
僅僅用於打印日誌,無實際的業務用途。Broker
可用性信息。在選擇發送到的消息隊列時,會參考Broker
發送消息的延遲,詳細解析見:MQFaultStrategyRemotingException
時,若是進行消息發送失敗重試,則可能致使消息發送重複。例如,發送消息超時(RemotingTimeoutException
),實際Broker
接收到該消息並處理成功。所以,Consumer
在消費時,須要保證冪等性。1: private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
2: // 緩存中獲取 Topic發佈信息
3: TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
4: // 當無可用的 Topic發佈信息時,從Namesrv獲取一次
5: if (null == topicPublishInfo || !topicPublishInfo.ok()) {
6: this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
7: this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
8: topicPublishInfo = this.topicPublishInfoTable.get(topic);
9: }
10: // 若獲取的 Topic發佈信息時候可用,則返回
11: if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
12: return topicPublishInfo;
13: } else { // 使用 {@link DefaultMQProducer#createTopicKey} 對應的 Topic發佈信息。用於 Topic發佈信息不存在 && Broker支持自動建立Topic
14: this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
15: topicPublishInfo = this.topicPublishInfoTable.get(topic);
16: return topicPublishInfo;
17: }
18: }複製代碼
topicPublishInfoTable
,其次從Namesrv
中得到。topicPublishInfoTable
中得到 Topic發佈信息。Namesrv
中得到 Topic發佈信息。Namesrv
沒法獲取時,使用 {@link DefaultMQProducer#createTopicKey}
對應的 Topic發佈信息。目的是當 Broker
開啓自動建立 Topic開關時,Broker
接收到消息後自動建立Topic,詳細解析見《RocketMQ 源碼分析 —— Topic》。![]()
Latency類圖
1: public class MQFaultStrategy {
2: private final static Logger log = ClientLogger.getLog();
3:
4: /** 5: * 延遲故障容錯,維護每一個Broker的發送消息的延遲 6: * key:brokerName 7: */
8: private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
9: /** 10: * 發送消息延遲容錯開關 11: */
12: private boolean sendLatencyFaultEnable = false;
13: /** 14: * 延遲級別數組 15: */
16: private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
17: /** 18: * 不可用時長數組 19: */
20: private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
21:
22: /** 23: * 根據 Topic發佈信息 選擇一個消息隊列 24: * 25: * @param tpInfo Topic發佈信息 26: * @param lastBrokerName brokerName 27: * @return 消息隊列 28: */
29: public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
30: if (this.sendLatencyFaultEnable) {
31: try {
32: // 獲取 brokerName=lastBrokerName && 可用的一個消息隊列
33: int index = tpInfo.getSendWhichQueue().getAndIncrement();
34: for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
35: int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
36: if (pos < 0)
37: pos = 0;
38: MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
39: if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
40: if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
41: return mq;
42: }
43: }
44: // 選擇一個相對好的broker,並得到其對應的一個消息隊列,不考慮該隊列的可用性
45: final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
46: int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
47: if (writeQueueNums > 0) {
48: final MessageQueue mq = tpInfo.selectOneMessageQueue();
49: if (notBestBroker != null) {
50: mq.setBrokerName(notBestBroker);
51: mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
52: }
53: return mq;
54: } else {
55: latencyFaultTolerance.remove(notBestBroker);
56: }
57: } catch (Exception e) {
58: log.error("Error occurred when selecting message queue", e);
59: }
60: // 選擇一個消息隊列,不考慮隊列的可用性
61: return tpInfo.selectOneMessageQueue();
62: }
63: // 得到 lastBrokerName 對應的一個消息隊列,不考慮該隊列的可用性
64: return tpInfo.selectOneMessageQueue(lastBrokerName);
65: }
66:
67: /** 68: * 更新延遲容錯信息 69: * 70: * @param brokerName brokerName 71: * @param currentLatency 延遲 72: * @param isolation 是否隔離。當開啓隔離時,默認延遲爲30000。目前主要用於發送消息異常時 73: */
74: public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
75: if (this.sendLatencyFaultEnable) {
76: long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
77: this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
78: }
79: }
80:
81: /** 82: * 計算延遲對應的不可用時間 83: * 84: * @param currentLatency 延遲 85: * @return 不可用時間 86: */
87: private long computeNotAvailableDuration(final long currentLatency) {
88: for (int i = latencyMax.length - 1; i >= 0; i--) {
89: if (currentLatency >= latencyMax[i])
90: return this.notAvailableDuration[i];
91: }
92: return 0;
93: }複製代碼
Producer
消息發送容錯策略。默認狀況下容錯策略關閉,即sendLatencyFaultEnable=false
。第 74 至 79 行 :更新延遲容錯信息。當 Producer
發送消息時間過長,則邏輯認爲N秒內不可用。按照latencyMax
,notAvailableDuration
的配置,對應以下:數組
| Producer發送消息消耗時長 | Broker不可用時長 |
| --- | --- |
| >= 15000 ms | 600 1000 ms |
| >= 3000 ms | 180 1000 ms |
| >= 2000 ms | 120 1000 ms |
| >= 1000 ms | 60 1000 ms |
| >= 550 ms | 30 * 1000 ms |
| >= 100 ms | 0 ms |
| >= 50 ms | 0 ms |緩存
1: public interface LatencyFaultTolerance<T> {
2:
3: /** 4: * 更新對應的延遲和不可用時長 5: * 6: * @param name 對象 7: * @param currentLatency 延遲 8: * @param notAvailableDuration 不可用時長 9: */
10: void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
11:
12: /** 13: * 對象是否可用 14: * 15: * @param name 對象 16: * @return 是否可用 17: */
18: boolean isAvailable(final T name);
19:
20: /** 21: * 移除對象 22: * 23: * @param name 對象 24: */
25: void remove(final T name);
26:
27: /** 28: * 獲取一個對象 29: * 30: * @return 對象 31: */
32: T pickOneAtLeast();
33: }複製代碼
1: public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
2:
3: /** 4: * 對象故障信息Table 5: */
6: private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<>(16);
7: /** 8: * 對象選擇Index 9: * @see #pickOneAtLeast() 10: */
11: private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
12:
13: @Override
14: public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
15: FaultItem old = this.faultItemTable.get(name);
16: if (null == old) {
17: // 建立對象
18: final FaultItem faultItem = new FaultItem(name);
19: faultItem.setCurrentLatency(currentLatency);
20: faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
21: // 更新對象
22: old = this.faultItemTable.putIfAbsent(name, faultItem);
23: if (old != null) {
24: old.setCurrentLatency(currentLatency);
25: old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
26: }
27: } else { // 更新對象
28: old.setCurrentLatency(currentLatency);
29: old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
30: }
31: }
32:
33: @Override
34: public boolean isAvailable(final String name) {
35: final FaultItem faultItem = this.faultItemTable.get(name);
36: if (faultItem != null) {
37: return faultItem.isAvailable();
38: }
39: return true;
40: }
41:
42: @Override
43: public void remove(final String name) {
44: this.faultItemTable.remove(name);
45: }
46:
47: /** 48: * 選擇一個相對優秀的對象 49: * 50: * @return 對象 51: */
52: @Override
53: public String pickOneAtLeast() {
54: // 建立數組
55: final Enumeration<FaultItem> elements = this.faultItemTable.elements();
56: List<FaultItem> tmpList = new LinkedList<>();
57: while (elements.hasMoreElements()) {
58: final FaultItem faultItem = elements.nextElement();
59: tmpList.add(faultItem);
60: }
61: //
62: if (!tmpList.isEmpty()) {
63: // 打亂 + 排序。TODO 疑問:應該只能二選一。猜想Collections.shuffle(tmpList)去掉。
64: Collections.shuffle(tmpList);
65: Collections.sort(tmpList);
66: // 選擇順序在前一半的對象
67: final int half = tmpList.size() / 2;
68: if (half <= 0) {
69: return tmpList.get(0).getName();
70: } else {
71: final int i = this.whichItemWorst.getAndIncrement() % half;
72: return tmpList.get(i).getName();
73: }
74: }
75: return null;
76: }
77: }複製代碼
1: class FaultItem implements Comparable<FaultItem> {
2: /** 3: * 對象名 4: */
5: private final String name;
6: /** 7: * 延遲 8: */
9: private volatile long currentLatency;
10: /** 11: * 開始可用時間 12: */
13: private volatile long startTimestamp;
14:
15: public FaultItem(final String name) {
16: this.name = name;
17: }
18:
19: /** 20: * 比較對象 21: * 可用性 > 延遲 > 開始可用時間 22: * 23: * @param other other 24: * @return 升序 25: */
26: @Override
27: public int compareTo(final FaultItem other) {
28: if (this.isAvailable() != other.isAvailable()) {
29: if (this.isAvailable())
30: return -1;
31:
32: if (other.isAvailable())
33: return 1;
34: }
35:
36: if (this.currentLatency < other.currentLatency)
37: return -1;
38: else if (this.currentLatency > other.currentLatency) {
39: return 1;
40: }
41:
42: if (this.startTimestamp < other.startTimestamp)
43: return -1;
44: else if (this.startTimestamp > other.startTimestamp) {
45: return 1;
46: }
47:
48: return 0;
49: }
50:
51: /** 52: * 是否可用:當開始可用時間大於當前時間 53: * 54: * @return 是否可用 55: */
56: public boolean isAvailable() {
57: return (System.currentTimeMillis() - startTimestamp) >= 0;
58: }
59:
60: @Override
61: public int hashCode() {
62: int result = getName() != null ? getName().hashCode() : 0;
63: result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
64: result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
65: return result;
66: }
67:
68: @Override
69: public boolean equals(final Object o) {
70: if (this == o)
71: return true;
72: if (!(o instanceof FaultItem))
73: return false;
74:
75: final FaultItem faultItem = (FaultItem) o;
76:
77: if (getCurrentLatency() != faultItem.getCurrentLatency())
78: return false;
79: if (getStartTimestamp() != faultItem.getStartTimestamp())
80: return false;
81: return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
82:
83: }
84: }複製代碼
1: private SendResult sendKernelImpl(final Message msg, // 2: final MessageQueue mq, // 3: final CommunicationMode communicationMode, // 4: final SendCallback sendCallback, // 5: final TopicPublishInfo topicPublishInfo, // 6: final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
7: // 獲取 broker地址
8: String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
9: if (null == brokerAddr) {
10: tryToFindTopicPublishInfo(mq.getTopic());
11: brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
12: }
13: //
14: SendMessageContext context = null;
15: if (brokerAddr != null) {
16: // 是否使用broker vip通道。broker會開啓兩個端口對外服務。
17: brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
18: byte[] prevBody = msg.getBody(); // 記錄消息內容。下面邏輯可能改變消息內容,例如消息壓縮。
19: try {
20: // 設置惟一編號
21: MessageClientIDSetter.setUniqID(msg);
22: // 消息壓縮
23: int sysFlag = 0;
24: if (this.tryToCompressMessage(msg)) {
25: sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
26: }
27: // 事務
28: final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
29: if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
30: sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
31: }
32: // hook:發送消息校驗
33: if (hasCheckForbiddenHook()) {
34: CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
35: checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
36: checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
37: checkForbiddenContext.setCommunicationMode(communicationMode);
38: checkForbiddenContext.setBrokerAddr(brokerAddr);
39: checkForbiddenContext.setMessage(msg);
40: checkForbiddenContext.setMq(mq);
41: checkForbiddenContext.setUnitMode(this.isUnitMode());
42: this.executeCheckForbiddenHook(checkForbiddenContext);
43: }
44: // hook:發送消息前邏輯
45: if (this.hasSendMessageHook()) {
46: context = new SendMessageContext();
47: context.setProducer(this);
48: context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
49: context.setCommunicationMode(communicationMode);
50: context.setBornHost(this.defaultMQProducer.getClientIP());
51: context.setBrokerAddr(brokerAddr);
52: context.setMessage(msg);
53: context.setMq(mq);
54: String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
55: if (isTrans != null && isTrans.equals("true")) {
56: context.setMsgType(MessageType.Trans_Msg_Half);
57: }
58: if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
59: context.setMsgType(MessageType.Delay_Msg);
60: }
61: this.executeSendMessageHookBefore(context);
62: }
63: // 構建發送消息請求
64: SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
65: requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
66: requestHeader.setTopic(msg.getTopic());
67: requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
68: requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
69: requestHeader.setQueueId(mq.getQueueId());
70: requestHeader.setSysFlag(sysFlag);
71: requestHeader.setBornTimestamp(System.currentTimeMillis());
72: requestHeader.setFlag(msg.getFlag());
73: requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
74: requestHeader.setReconsumeTimes(0);
75: requestHeader.setUnitMode(this.isUnitMode());
76: if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 消息重發Topic
77: String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
78: if (reconsumeTimes != null) {
79: requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
80: MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
81: }
82: String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
83: if (maxReconsumeTimes != null) {
84: requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
85: MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
86: }
87: }
88: // 發送消息
89: SendResult sendResult = null;
90: switch (communicationMode) {
91: case ASYNC:
92: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
93: brokerAddr, // 1
94: mq.getBrokerName(), // 2
95: msg, // 3
96: requestHeader, // 4
97: timeout, // 5
98: communicationMode, // 6
99: sendCallback, // 7
100: topicPublishInfo, // 8
101: this.mQClientFactory, // 9
102: this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
103: context, //
104: this);
105: break;
106: case ONEWAY:
107: case SYNC:
108: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
109: brokerAddr,
110: mq.getBrokerName(),
111: msg,
112: requestHeader,
113: timeout,
114: communicationMode,
115: context,
116: this);
117: break;
118: default:
119: assert false;
120: break;
121: }
122: // hook:發送消息後邏輯
123: if (this.hasSendMessageHook()) {
124: context.setSendResult(sendResult);
125: this.executeSendMessageHookAfter(context);
126: }
127: // 返回發送結果
128: return sendResult;
129: } catch (RemotingException e) {
130: if (this.hasSendMessageHook()) {
131: context.setException(e);
132: this.executeSendMessageHookAfter(context);
133: }
134: throw e;
135: } catch (MQBrokerException e) {
136: if (this.hasSendMessageHook()) {
137: context.setException(e);
138: this.executeSendMessageHookAfter(context);
139: }
140: throw e;
141: } catch (InterruptedException e) {
142: if (this.hasSendMessageHook()) {
143: context.setException(e);
144: this.executeSendMessageHookAfter(context);
145: }
146: throw e;
147: } finally {
148: msg.setBody(prevBody);
149: }
150: }
151: // broker爲空拋出異常
152: throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
153: }複製代碼
Broker
。SendMessageRequestHeader
。MQClientInstance#sendMessage(...)
發起網絡請求。![]()
接收發送消息API順序圖
1: @Override
2: public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
3: SendMessageContext mqtraceContext;
4: switch (request.getCode()) {
5: case RequestCode.CONSUMER_SEND_MSG_BACK:
6: return this.consumerSendMsgBack(ctx, request);
7: default:
8: // 解析請求
9: SendMessageRequestHeader requestHeader = parseRequestHeader(request);
10: if (requestHeader == null) {
11: return null;
12: }
13: // 發送請求Context。在 hook 場景下使用
14: mqtraceContext = buildMsgContext(ctx, requestHeader);
15: // hook:處理髮送消息前邏輯
16: this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
17: // 處理髮送消息邏輯
18: final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
19: // hook:處理髮送消息後邏輯
20: this.executeSendMessageHookAfter(response, mqtraceContext);
21: return response;
22: }
23: }
24:
25: private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // 26: final RemotingCommand request, // 27: final SendMessageContext sendMessageContext, // 28: final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
29:
30: // 初始化響應
31: final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
32: final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
33: response.setOpaque(request.getOpaque());
34: response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
35: response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
36:
37: if (log.isDebugEnabled()) {
38: log.debug("receive SendMessage request command, {}", request);
39: }
40:
41: // 若是未開始接收消息,拋出系統異常
42: @SuppressWarnings("SpellCheckingInspection")
43: final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
44: if (this.brokerController.getMessageStore().now() < startTimstamp) {
45: response.setCode(ResponseCode.SYSTEM_ERROR);
46: response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
47: return response;
48: }
49:
50: // 消息配置(Topic配置)校驗
51: response.setCode(-1);
52: super.msgCheck(ctx, requestHeader, response);
53: if (response.getCode() != -1) {
54: return response;
55: }
56:
57: final byte[] body = request.getBody();
58:
59: // 若是隊列小於0,從可用隊列隨機選擇
60: int queueIdInt = requestHeader.getQueueId();
61: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
62: if (queueIdInt < 0) {
63: queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
64: }
65:
66: //
67: int sysFlag = requestHeader.getSysFlag();
68: if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
69: sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
70: }
71:
72: // 對RETRY類型的消息處理。若是超過最大消費次數,則topic修改爲"%DLQ%" + 分組名,即加入 死信隊列(Dead Letter Queue)
73: String newTopic = requestHeader.getTopic();
74: if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
75: // 獲取訂閱分組配置
76: String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
77: SubscriptionGroupConfig subscriptionGroupConfig =
78: this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
79: if (null == subscriptionGroupConfig) {
80: response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
81: response.setRemark("subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
82: return response;
83: }
84: // 計算最大可消費次數
85: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
86: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
87: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
88: }
89: int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
90: if (reconsumeTimes >= maxReconsumeTimes) { // 超過最大消費次數
91: newTopic = MixAll.getDLQTopic(groupName);
92: queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
93: topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
94: DLQ_NUMS_PER_GROUP, //
95: PermName.PERM_WRITE, 0
96: );
97: if (null == topicConfig) {
98: response.setCode(ResponseCode.SYSTEM_ERROR);
99: response.setRemark("topic[" + newTopic + "] not exist");
100: return response;
101: }
102: }
103: }
104:
105: // 建立MessageExtBrokerInner
106: MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
107: msgInner.setTopic(newTopic);
108: msgInner.setBody(body);
109: msgInner.setFlag(requestHeader.getFlag());
110: MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
111: msgInner.setPropertiesString(requestHeader.getProperties());
112: msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
113: msgInner.setQueueId(queueIdInt);
114: msgInner.setSysFlag(sysFlag);
115: msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
116: msgInner.setBornHost(ctx.channel().remoteAddress());
117: msgInner.setStoreHost(this.getStoreHost());
118: msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
119:
120: // 校驗是否不容許發送事務消息
121: if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
122: String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
123: if (traFlag != null) {
124: response.setCode(ResponseCode.NO_PERMISSION);
125: response.setRemark(
126: "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
127: return response;
128: }
129: }
130:
131: // 添加消息
132: PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
133: if (putMessageResult != null) {
134: boolean sendOK = false;
135:
136: switch (putMessageResult.getPutMessageStatus()) {
137: // Success
138: case PUT_OK:
139: sendOK = true;
140: response.setCode(ResponseCode.SUCCESS);
141: break;
142: case FLUSH_DISK_TIMEOUT:
143: response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
144: sendOK = true;
145: break;
146: case FLUSH_SLAVE_TIMEOUT:
147: response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
148: sendOK = true;
149: break;
150: case SLAVE_NOT_AVAILABLE:
151: response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
152: sendOK = true;
153: break;
154:
155: // Failed
156: case CREATE_MAPEDFILE_FAILED:
157: response.setCode(ResponseCode.SYSTEM_ERROR);
158: response.setRemark("create mapped file failed, server is busy or broken.");
159: break;
160: case MESSAGE_ILLEGAL:
161: case PROPERTIES_SIZE_EXCEEDED:
162: response.setCode(ResponseCode.MESSAGE_ILLEGAL);
163: response.setRemark(
164: "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
165: break;
166: case SERVICE_NOT_AVAILABLE:
167: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
168: response.setRemark(
169: "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
170: break;
171: case OS_PAGECACHE_BUSY:
172: response.setCode(ResponseCode.SYSTEM_ERROR);
173: response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
174: break;
175: case UNKNOWN_ERROR:
176: response.setCode(ResponseCode.SYSTEM_ERROR);
177: response.setRemark("UNKNOWN_ERROR");
178: break;
179: default:
180: response.setCode(ResponseCode.SYSTEM_ERROR);
181: response.setRemark("UNKNOWN_ERROR DEFAULT");
182: break;
183: }
184:
185: String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
186: if (sendOK) {
187: // 統計
188: this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
189: this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
190: this.brokerController.getBrokerStatsManager().incBrokerPutNums();
191:
192: // 響應
193: response.setRemark(null);
194: responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
195: responseHeader.setQueueId(queueIdInt);
196: responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
197: doResponse(ctx, request, response);
198:
199: // hook:設置發送成功到context
200: if (hasSendMessageHook()) {
201: sendMessageContext.setMsgId(responseHeader.getMsgId());
202: sendMessageContext.setQueueId(responseHeader.getQueueId());
203: sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
204:
205: int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
206: int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
207: int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
208:
209: sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
210: sendMessageContext.setCommercialSendTimes(incValue);
211: sendMessageContext.setCommercialSendSize(wroteSize);
212: sendMessageContext.setCommercialOwner(owner);
213: }
214: return null;
215: } else {
216: // hook:設置發送失敗到context
217: if (hasSendMessageHook()) {
218: int wroteSize = request.getBody().length;
219: int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
220:
221: sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
222: sendMessageContext.setCommercialSendTimes(incValue);
223: sendMessageContext.setCommercialSendSize(wroteSize);
224: sendMessageContext.setCommercialOwner(owner);
225: }
226: }
227: } else {
228: response.setCode(ResponseCode.SYSTEM_ERROR);
229: response.setRemark("store putMessage return null");
230: }
231:
232: return response;
233: }複製代碼
#processRequest()
說明 :處理消息請求。#sendMessage()
說明 :發送消息,並返回發送消息結果。Broker
能夠設置隨機選擇一個消息隊列。MessageExtBrokerInner
。doResponse(ctx, request, response)
進行響應,最後return null
,緣由是:響應給 Producer
可能發生異常,#doResponse(ctx, request, response)
捕捉了該異常並輸出日誌。這樣作的話,咱們進行排查 Broker
接收消息成功後響應是否存在異常會方便不少。1: protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, 2: final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
3: // 檢查 broker 是否有寫入權限
4: if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
5: && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
6: response.setCode(ResponseCode.NO_PERMISSION);
7: response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
8: + "] sending message is forbidden");
9: return response;
10: }
11: // 檢查topic是否能夠被髮送。目前是{@link MixAll.DEFAULT_TOPIC}不被容許發送
12: if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
13: String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
14: log.warn(errorMsg);
15: response.setCode(ResponseCode.SYSTEM_ERROR);
16: response.setRemark(errorMsg);
17: return response;
18: }
19: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
20: if (null == topicConfig) { // 不能存在topicConfig,則進行建立
21: int topicSysFlag = 0;
22: if (requestHeader.isUnitMode()) {
23: if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
24: topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
25: } else {
26: topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
27: }
28: }
29: // 建立topic配置
30: log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
31: topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
32: requestHeader.getTopic(), //
33: requestHeader.getDefaultTopic(), //
34: RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
35: requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
36: if (null == topicConfig) {
37: if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
38: topicConfig =
39: this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
40: requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
41: topicSysFlag);
42: }
43: }
44: // 若是沒配置
45: if (null == topicConfig) {
46: response.setCode(ResponseCode.TOPIC_NOT_EXIST);
47: response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
48: + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
49: return response;
50: }
51: }
52: // 隊列編號是否正確
53: int queueIdInt = requestHeader.getQueueId();
54: int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
55: if (queueIdInt >= idValid) {
56: String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
57: queueIdInt,
58: topicConfig.toString(),
59: RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
60: log.warn(errorInfo);
61: response.setCode(ResponseCode.SYSTEM_ERROR);
62: response.setRemark(errorInfo);
63: return response;
64: }
65: return response;
66: }複製代碼
Broker
是否有寫入權限,topic配置是否存在,隊列編號是否正確。{@link MixAll.DEFAULT_TOPIC}
不被容許發送。defaultTopic
的Topic配置不存在,又或者是 存在可是不容許繼承,詳細解析見《RocketMQ 源碼分析 —— Topic》。1: public PutMessageResult putMessage(MessageExtBrokerInner msg) {
2: if (this.shutdown) {
3: log.warn("message store has shutdown, so putMessage is forbidden");
4: return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
5: }
6:
7: // 從節點不容許寫入
8: if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
9: long value = this.printTimes.getAndIncrement();
10: if ((value % 50000) == 0) {
11: log.warn("message store is slave mode, so putMessage is forbidden ");
12: }
13:
14: return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
15: }
16:
17: // store是否容許寫入
18: if (!this.runningFlags.isWriteable()) {
19: long value = this.printTimes.getAndIncrement();
20: if ((value % 50000) == 0) {
21: log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
22: }
23:
24: return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
25: } else {
26: this.printTimes.set(0);
27: }
28:
29: // 消息過長
30: if (msg.getTopic().length() > Byte.MAX_VALUE) {
31: log.warn("putMessage message topic length too long " + msg.getTopic().length());
32: return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
33: }
34:
35: // 消息附加屬性過長
36: if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
37: log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
38: return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
39: }
40:
41: if (this.isOSPageCacheBusy()) {
42: return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
43: }
44:
45: long beginTime = this.getSystemClock().now();
46: // 添加消息到commitLog
47: PutMessageResult result = this.commitLog.putMessage(msg);
48:
49: long eclipseTime = this.getSystemClock().now() - beginTime;
50: if (eclipseTime > 500) {
51: log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
52: }
53: this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
54:
55: if (null == result || !result.isOk()) {
56: this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
57: }
58:
59: return result;
60: }複製代碼
CommitLog
實現。Broker
是否能夠寫入。CommitLong
進行存儲,詳細邏輯見:《RocketMQ 源碼分析 —— Message 存儲》感謝閱讀、收藏、點贊本文的工程師同窗。微信
閱讀源碼是件令本身很愉悅的事情,編寫源碼解析是讓本身腦細胞死傷無數的過程,痛並快樂着。網絡
若是有內容寫的存在錯誤,或是不清晰的地方,見笑了,🙂。歡迎加 QQ:7685413 咱們一塊兒探討,共進步。app
再次感謝閱讀、收藏、點贊本文的工程師同窗。dom