目錄java
本次分析基於 RocketMQ release-4.5.2
版本。git
分析的目標是: RocketMQ
中 Producer
是怎麼將消息發送至 Broker
的?github
說到學習源碼,首先固然是要把源代碼下載下來,官方地址。使用 git clone https://github.com/apache/rocketmq.git
將源代碼 clone
至本地。算法
用IDEA
打開該項目apache
能夠看到有不少子模塊,此次學習的是 Producer
故打開 rocketmq-client
模塊,能夠在單元測試用找到測試 Producer
功能的類。設計模式
打開該類,觀察其方法api
能夠看出以 test
開頭的方法都是單元測試方法,能夠直接運行。 init
方法和 terminate
分別是單元測試的初始化方法和銷燬方法。app
// 建立一個默認的客戶端實例 @Spy private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); // mock 一個真正與 broker 交互的對象 @Mock private MQClientAPIImpl mQClientAPIImpl; @Mock private NettyRemotingClient nettyRemotingClient; private DefaultMQProducer producer; private Message message; private Message zeroMsg; private Message bigMessage; private String topic = "FooBar"; private String producerGroupPrefix = "FooBar_PID"; // 初始化 @Before public void init() throws Exception { String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); // 建立一個默認的 producer producer = new DefaultMQProducer(producerGroupTemp); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setCompressMsgBodyOverHowmuch(16); message = new Message(topic, new byte[] {'a'}); zeroMsg = new Message(topic, new byte[] {}); bigMessage = new Message(topic, "This is a very huge message!".getBytes()); producer.start(); // 反射將客戶端實例設置到 producer 對象中 Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); field.set(producer.getDefaultMQProducerImpl(), mQClientFactory); // 反射將一個真正與 broker 交互的對象 設置到客戶端實例中 field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); field.setAccessible(true); field.set(mQClientFactory, mQClientAPIImpl); // 註冊 客戶端實例 producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); // mock 交互對象發消息 when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenReturn(createSendResult(SendStatus.SEND_OK)); } // 銷燬 @After public void terminate() { producer.shutdown(); }
這裏選 testSendMessageSync_Success()
方法做爲此次分析入口。(該方法用來測試成功的發送同步消息)。dom
DEBUG
跟蹤調用鏈能夠看出 MQClientAPIImpl#sendMessage
,纔是發送消息給 broker
的底層封裝,其經過引入 rocketmq-remoting
模塊的 org.apache.rocketmq.remoting.netty.NettyRemotingClient
類與 Broker
交互。至於與 Broker
基於 Netty
的 RPC
協議分析,這裏不展開分析。能夠經過閱讀上文提到的 NettyRemotingClient
類進一步瞭解。異步
PS:由於使用 mockito
因此調用鏈中會有些與 producer
發送消息不相關的棧。
PS:經過查看調用鏈的棧信息,能夠快速瞭解源碼中,某一行爲的總體流程。
如下源碼按調用鏈從底層往上走
public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { // 發送消息 return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer); } // 發送消息 public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); // RPC 請求對象 RemotingCommand request = null; if (sendSmartMsg || msg instanceof MessageBatch) { // 該類的 field 全爲 a,b,c,d 等,能夠加速 FastJson 反序列化 SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); // 根據 request code 建立 RPC 請求對象 // 該設計是經過類型碼的形式,來標識不一樣類型的請求 request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } // 設置請求體,也就是消息的消息體 request.setBody(msg.getBody()); // 根據模式 選擇 oneway 或者 同步 或者 異步 switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeAsync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); default: assert false; break; } return null; }
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); // 查找 brokerName 對應 broker,master 節點的地址 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); // 查找失敗,嘗試從新從 NameServer 拉取 if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { // 根據 VIP Channel 設置,更新 broker 節點地址 brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { // 設置 自定義屬性 UNIQ_KEY -> 0A0A15A01F3C18B4AAC22DB7B6AC0000 MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { // 設置 自定義屬性 INSTANCE_ID -> <NameSpace> msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } // 消息設置 處理標識,用於標識消息通過什麼樣的處理,能夠查看該類 org.apache.rocketmq.common.sysflag.MessageSysFlag ,該類是設計較好的標識處理,能夠借鑑 int sysFlag = 0; boolean msgBodyCompressed = false; // 根據 DefaultMQProducer#compressMsgBodyOverHowmuch 選擇是否壓縮,默認超過 4K 則壓縮,壓縮算法爲 zip if (this.tryToCompressMessage(msg)) { // 設置壓縮標識,COMPRESSED_FLAG = 0x1 sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } // 獲取屬性,判斷是不是事務消息, PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG" final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { // 設置事務標識,TRANSACTION_PREPARED_TYPE = 0x1 << 2 sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } // hook 操做,這段是檢測是否有發送權限 hook 操做, Hook 接口爲 org.apache.rocketmq.client.hook.CheckForbiddenHook, 注意:在 DefaultMQProducerImpl 中,該類是以列表形式存在的 if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } // hook 操做,這段是執行發送消息前的 hook 操做, Hook 接口爲 org.apache.rocketmq.client.hook.SendMessageHook, 注意:在 DefaultMQProducerImpl 中,該類是以列表形式存在的 if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } // 設置 request header SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } // 異步發送消息 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } // oneway 或同步發送消息 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } // hook 操做,這段是執行發送消息後的 hook 操做, Hook 接口爲 org.apache.rocketmq.client.hook.SendMessageHook, 注意:在 DefaultMQProducerImpl 中,該類是以列表形式存在的 if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 確保 Producer 狀態爲 RUNNING 態,全部狀態可查看 org.apache.rocketmq.common.ServiceState 枚舉類 this.makeSureStateOK(); // 校驗消息是否符合規則,該工具類是比較好的參數校驗封裝形式,能夠參考借鑑 Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); // 第一次執行發送消息前的時間戳 long beginTimestampFirst = System.currentTimeMillis(); // 當前次發送消息前的時間戳 long beginTimestampPrev = beginTimestampFirst; // 當前次發送消息後的時間戳 long endTimestamp = beginTimestampFirst; // 從 NameServer 獲取 topic 相關信息,包含 topic 中的 queue 相關信息; queue 路由相關信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 當 (topic 相關信息不爲 null) 而且 (topic 中的 queue 列表不爲 null 或者 空隊列) if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 當模式爲 SYNC 時, 默認執行次數爲 3 次,包含 1 次正常調用,2 次重試;其餘只執行 1 次 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; // 第幾回發送對應的 broker 信息 String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { // 獲取上次發送的 broker 名稱 String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 選擇一個 queue 進行發送。有失敗重試策略,默認使用 RoundRobin 算法,能夠經過 DefaultMQProducer#setSendLatencyFaultEnable 設置啓用 LatencyFault 策略 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { // 記錄發送前的時間戳 beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } // 計算花費時間 long costTime = beginTimestampPrev - beginTimestampFirst; // 花費時間 超過了 timeout ,則超時處理 if (timeout < costTime) { callTimeout = true; break; } // 發送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // 當設置啓用 LatencyFault 策略時,更新 FaultItem this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); // 根據模式,選擇發送消息後的處理方式 switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: // 模式爲 SYNC 時,會有重試處理 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } // 如下代碼爲異常處理 } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException( "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); } throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 發送消息 同步模式 return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } /** * DEFAULT SYNC ------------------------------------------------------- */ public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 發送消息,默認超時時間爲3000ms return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }
該類使用了門面模式,簡單來講就是經過一個門面類,將內部複雜的細節封裝好,給客戶端提供統一的調用接口。
擴展能夠參考博主以前寫的博客《設計模式學習筆記 —— 外觀 (Facade) 模式》
/** * Send message in synchronous mode. This method returns only when the sending procedure totally completes. * </p> * * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially * delivered to broker(s). It's up to the application developers to resolve potential duplication issue. * * @param msg Message to send. * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. * @throws MQClientException if there is any client error. * @throws RemotingException if there is any network-tier error. * @throws MQBrokerException if there is any error with broker. * @throws InterruptedException if the sending thread is interrupted. */ @Override public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 校驗消息是否符合規則,該工具類是比較好的參數校驗封裝形式,能夠參考借鑑 Validators.checkMessage(msg, this); // 使用 NamespaceUtil 工具類包裝 namespace ,邏輯看 org.apache.rocketmq.common.protocol.NamespaceUtilTest#testWrapNamespace 單元測試 msg.setTopic(withNamespace(msg.getTopic())); // 發送消息 return this.defaultMQProducerImpl.send(msg); }
@Test public void testSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { // mock 從 NameServer 獲取 Topic 的路由信息 when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); // 發送消息 SendResult sendResult = producer.send(message); assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); assertThat(sendResult.getQueueOffset()).isEqualTo(456L); }