本文沿着《RocketMQ消息軌跡-設計篇》的思路,從以下3個方面對其源碼進行解讀:java
首先咱們來看一下在消息發送端如何啓用消息軌跡,示例代碼以下:json
public class TraceProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true); // [@1](https://my.oschina.net/u/1198) producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 10; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
從上述代碼能夠看出其關鍵點是在建立DefaultMQProducer時指定開啓消息軌跡跟蹤。咱們不妨瀏覽一下DefaultMQProducer與啓用消息軌跡相關的構造函數:服務器
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
參數以下:app
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { // [@1](https://my.oschina.net/u/1198) this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); //if client open the message trace feature if (enableMsgTrace) { // @2 try { AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.getDefaultMQProducerImpl().registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); // [@3](https://my.oschina.net/u/2648711) } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } } }
代碼@1:首先介紹一下其局部變量。運維
代碼@2:用來構建AsyncTraceDispatcher,看其名:異步轉發消息軌跡數據,稍後重點關注。異步
代碼@3:構建SendMessageTraceHookImpl對象,並使用AsyncTraceDispatcher用來異步轉發。函數
public void sendMessageBefore(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { // @1 return; } //build the context content of TuxeTraceContext TraceContext tuxeContext = new TraceContext(); tuxeContext.setTraceBeans(new ArrayList<tracebean>(1)); context.setMqTraceContext(tuxeContext); tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setGroupName(context.getProducerGroup()); // @2 //build the data bean object of message trace TraceBean traceBean = new TraceBean(); // @3 traceBean.setTopic(context.getMessage().getTopic()); traceBean.setTags(context.getMessage().getTags()); traceBean.setKeys(context.getMessage().getKeys()); traceBean.setStoreHost(context.getBrokerAddr()); traceBean.setBodyLength(context.getMessage().getBody().length); traceBean.setMsgType(context.getMsgType()); tuxeContext.getTraceBeans().add(traceBean); }
代碼@1:若是topic主題爲消息軌跡的Topic,直接返回。源碼分析
代碼@2:在消息發送上下文中,設置用來跟蹤消息軌跡的上下環境,裏面主要包含一個TraceBean集合、追蹤類型(TraceType.Pub)與生產者所屬的組。學習
代碼@3:構建一條跟蹤消息,用TraceBean來表示,記錄原消息的topic、tags、keys、發送到broker地址、消息體長度等消息。ui
從上文看出,sendMessageBefore主要的用途就是在消息發送的時候,先準備一部分消息跟蹤日誌,存儲在發送上下文環境中,此時並不會發送消息軌跡數據。
public void sendMessageAfter(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) // @1 || context.getMqTraceContext() == null) { return; } if (context.getSendResult() == null) { return; } if (context.getSendResult().getRegionId() == null || !context.getSendResult().isTraceOn()) { // if switch is false,skip it return; } TraceContext tuxeContext = (TraceContext) context.getMqTraceContext(); TraceBean traceBean = tuxeContext.getTraceBeans().get(0); // @2 int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); // @3 tuxeContext.setCostTime(costTime); // @4 if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { tuxeContext.setSuccess(true); } else { tuxeContext.setSuccess(false); } tuxeContext.setRegionId(context.getSendResult().getRegionId()); traceBean.setMsgId(context.getSendResult().getMsgId()); traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2); localDispatcher.append(tuxeContext); // @5 }
代碼@1:若是topic主題爲消息軌跡的Topic,直接返回。
代碼@2:從MqTraceContext中獲取跟蹤的TraceBean,雖然設計成List結構體,但在消息發送場景,這裏的數據永遠只有一條,及時是批量發送也不例外。
代碼@3:獲取消息發送到收到響應結果的耗時。
代碼@4:設置costTime(耗時)、success(是否發送成功)、regionId(發送到broker所在的分區)、msgId(消息ID,全局惟一)、offsetMsgId(消息物理偏移量,若是是批量消息,則是最後一條消息的物理偏移量)、storeTime,這裏使用的是(客戶端發送時間 + 二分之一的耗時)來表示消息的存儲時間,這裏是一個估值。
代碼@5:將須要跟蹤的信息經過TraceDispatcher轉發到Broker服務器。其代碼以下:
public boolean append(final Object ctx) { boolean result = traceContextQueue.offer((TraceContext) ctx); if (!result) { log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx); } return result; }
這裏一個很是關鍵的點是offer方法的使用,當隊列沒法容納新的元素時會當即返回false,並不會阻塞。
接下來將目光轉向TraceDispatcher的實現。
TraceDispatcher,用於客戶端消息軌跡數據轉發到Broker,其默認實現類:AsyncTraceDispatcher。
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; this.batchSize = 100; this.maxMsgSize = 128000; this.discardCount = new AtomicLong(0L); this.traceContextQueue = new ArrayBlockingQueue<tracecontext>(1024); this.appenderQueue = new ArrayBlockingQueue<runnable>(queueSize); if (!UtilAll.isBlank(traceTopicName)) { this.traceTopicName = traceTopicName; } else { this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; } // @1 this.traceExecuter = new ThreadPoolExecutor(// : 10, // 20, // 1000 * 60, // TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); traceProducer = getAndCreateTraceProducer(rpcHook); // @2 }
代碼@1:初始化核心屬性,該版本這些值都是「固化」的,用戶沒法修改。
代碼@2:調用getAndCreateTraceProducer方法建立用於發送消息軌跡的Producer(消息發送者),下面詳細介紹一下其實現。
private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) { DefaultMQProducer traceProducerInstance = this.traceProducer; if (traceProducerInstance == null) { //@1 traceProducerInstance = new DefaultMQProducer(rpcHook); traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); traceProducerInstance.setSendMsgTimeout(5000); traceProducerInstance.setVipChannelEnabled(false); // The max size of message is 128K traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000); } return traceProducerInstance; }
代碼@1:若是還未創建發送者,則建立用於發送消息軌跡的消息發送者,其GroupName爲:_INNER_TRACE_PRODUCER,消息發送超時時間5s,最大容許發送消息大小118K。
public void start(String nameSrvAddr) throws MQClientException { if (isStarted.compareAndSet(false, true)) { // @1 traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); // @2 this.worker.setDaemon(true); this.worker.start(); this.registerShutDownHook(); }
開始啓動,其調用的時機爲啓動DefaultMQProducer時,若是啓用跟蹤消息軌跡,則調用之。
代碼@1:若是用於發送消息軌跡的發送者沒有啓動,則設置nameserver地址,並啓動着。
代碼@2:啓動一個線程,用於執行AsyncRunnable任務,接下來將重點介紹。
class AsyncRunnable implements Runnable { private boolean stopped; public void run() { while (!stopped) { List<tracecontext> contexts = new ArrayList<tracecontext>(batchSize); // @1 for (int i = 0; i < batchSize; i++) { TraceContext context = null; try { //get trace data element from blocking Queue — traceContextQueue context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); // @2 } catch (InterruptedException e) { } if (context != null) { contexts.add(context); } else { break; } } if (contexts.size() > 0) { : AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); // @3 traceExecuter.submit(request); } else if (AsyncTraceDispatcher.this.stopped) { this.stopped = true; } } } }
代碼@1:構建待提交消息跟蹤Bean,每次最多發送batchSize,默認爲100條。
代碼@2:從traceContextQueue中取出一個待提交的TraceContext,設置超時時間爲5s,即如何該隊列中沒有待提交的TraceContext,則最多等待5s。
代碼@3:向線程池中提交任務AsyncAppenderRequest。
public void sendTraceData(List<tracecontext> contextList) { Map<string, list<tracetransferbean>> transBeanMap = new HashMap<string, list<tracetransferbean>>(); for (TraceContext context : contextList) { //@1 if (context.getTraceBeans().isEmpty()) { continue; } // Topic value corresponding to original message entity content String topic = context.getTraceBeans().get(0).getTopic(); // @2 // Use original message entity's topic as key String key = topic; List<tracetransferbean> transBeanList = transBeanMap.get(key); if (transBeanList == null) { transBeanList = new ArrayList<tracetransferbean>(); transBeanMap.put(key, transBeanList); } TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context); // @3 transBeanList.add(traceData); } for (Map.Entry<string, list<tracetransferbean>> entry : transBeanMap.entrySet()) { // @4 flushData(entry.getValue()); } }
代碼@1:遍歷收集的消息軌跡數據。
代碼@2:獲取存儲消息軌跡的Topic。
代碼@3:對TraceContext進行編碼,這裏是消息軌跡的傳輸數據,稍後對其詳細看一下,瞭解其上傳的格式。
代碼@4:將編碼後的數據發送到Broker服務器。
根據消息軌跡跟蹤類型,其格式會有一些不同,下面分別來介紹其合適。
case Pub: { TraceBean bean = ctx.getTraceBeans().get(0); //append the content of context and traceBean to transferBean's TransData sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR); }
消息軌跡數據的協議使用字符串拼接,字段的分隔符號爲1,整個數據以2結尾,感受這個設計仍是有點「難以想象」,爲何不直接使用json協議呢?
for (TraceBean bean : ctx.getTraceBeans()) { sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);// } }
軌跡就是按照上述順序拼接而成,各個字段使用1分隔,每一條記錄使用2結尾。
case SubAfter: { for (TraceBean bean : ctx.getTraceBeans()) { sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR); } } }
格式編碼同樣,就不重複多說。
通過上面的源碼跟蹤,消息發送端的消息軌跡跟蹤流程、消息軌跡數據編碼協議就清晰了,接下來咱們使用一張序列圖來結束本部分的講解。
其實行文至此,只關注了消息發送的消息軌跡跟蹤,消息消費的軌跡跟蹤又是如何呢?其實現原理實際上是同樣的,就是在消息消費先後執行特定的鉤子函數,其實現類爲ConsumeMessageTraceHookImpl,因爲其實現與消息發送的思路相似,故就不詳細介紹了。
其實從上面的分析,咱們已經得知,RocketMQ的消息軌跡數據存儲在到Broker上,那消息軌跡的主題名如何指定?其路由信息又怎麼分配纔好呢?是每臺Broker上都建立仍是隻在其中某臺上建立呢?RocketMQ支持系統默認與自定義消息軌跡的主題。
RocketMQ默認的消息軌跡主題爲:RMQ_SYS_TRACE_TOPIC,那該Topic須要手工建立嗎?其路由信息呢?
{ if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { // @1 String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName(); TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(1); // @2 topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } }
上述代碼出自TopicConfigManager的構造函數,在Broker啓動的時候會建立topicConfigManager對象,用來管理topic的路由信息。
代碼@1:若是Broker開啓了消息軌跡跟蹤(traceTopicEnable=true)時,會自動建立默認消息軌跡的topic路由信息,注意其讀寫隊列數爲1。
在建立消息發送者、消息消費者時,能夠顯示的指定消息軌跡的Topic,例如:
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)
經過customizedTraceTopic來指定消息軌跡Topic。
舒適提示:一般在生產環境上,將不會開啓自動建立主題,故須要RocketMQ運維管理人員提早建立好Topic。
好了,本文就介紹到這裏了,本文詳細介紹了RocktMQ消息軌跡的實現原理,下一篇,咱們將進入到多副本的學習中。
做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,可掃描以下二維碼與做者進行互動。