本文主要研究一下rocketmq的enableMsgTracejava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.javagit
public class DefaultMQProducer extends ClientConfig implements MQProducer { private final InternalLogger log = ClientLogger.getLog(); //...... public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { this.namespace = namespace; this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); //if client open the message trace feature if (enableMsgTrace) { try { AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.getDefaultMQProducerImpl().registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } } } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/hook/SendMessageHook.javagithub
public interface SendMessageHook { String hookName(); void sendMessageBefore(final SendMessageContext context); void sendMessageAfter(final SendMessageContext context); }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.javaapache
public class SendMessageTraceHookImpl implements SendMessageHook { private TraceDispatcher localDispatcher; public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) { this.localDispatcher = localDispatcher; } @Override public String hookName() { return "SendMessageTraceHook"; } @Override public void sendMessageBefore(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { return; } //build the context content of TuxeTraceContext TraceContext tuxeContext = new TraceContext(); tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1)); context.setMqTraceContext(tuxeContext); tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup())); //build the data bean object of message trace TraceBean traceBean = new TraceBean(); traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic())); traceBean.setTags(context.getMessage().getTags()); traceBean.setKeys(context.getMessage().getKeys()); traceBean.setStoreHost(context.getBrokerAddr()); traceBean.setBodyLength(context.getMessage().getBody().length); traceBean.setMsgType(context.getMsgType()); tuxeContext.getTraceBeans().add(traceBean); } @Override public void sendMessageAfter(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) || context.getMqTraceContext() == null) { return; } if (context.getSendResult() == null) { return; } if (context.getSendResult().getRegionId() == null || !context.getSendResult().isTraceOn()) { // if switch is false,skip it return; } TraceContext tuxeContext = (TraceContext) context.getMqTraceContext(); TraceBean traceBean = tuxeContext.getTraceBeans().get(0); int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); tuxeContext.setCostTime(costTime); if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { tuxeContext.setSuccess(true); } else { tuxeContext.setSuccess(false); } tuxeContext.setRegionId(context.getSendResult().getRegionId()); traceBean.setMsgId(context.getSendResult().getMsgId()); traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2); localDispatcher.append(tuxeContext); } }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.javaapp
public interface TraceDispatcher { /** * Initialize asynchronous transfer data module */ void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException; /** * Append the transfering data * @param ctx data infomation * @return */ boolean append(Object ctx); /** * Write flush action * * @throws IOException */ void flush() throws IOException; /** * Close the trace Hook */ void shutdown(); }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.javadom
public class AsyncTraceDispatcher implements TraceDispatcher { private final static InternalLogger log = ClientLogger.getLog(); private final int queueSize; private final int batchSize; private final int maxMsgSize; private final DefaultMQProducer traceProducer; private final ThreadPoolExecutor traceExecutor; // The last discard number of log private AtomicLong discardCount; private Thread worker; private ArrayBlockingQueue<TraceContext> traceContextQueue; private ArrayBlockingQueue<Runnable> appenderQueue; private volatile Thread shutDownHook; private volatile boolean stopped = false; private DefaultMQProducerImpl hostProducer; private DefaultMQPushConsumerImpl hostConsumer; private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; private AtomicBoolean isStarted = new AtomicBoolean(false); private AccessChannel accessChannel = AccessChannel.LOCAL; public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; this.batchSize = 100; this.maxMsgSize = 128000; this.discardCount = new AtomicLong(0L); this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024); this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize); if (!UtilAll.isBlank(traceTopicName)) { this.traceTopicName = traceTopicName; } else { this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; } this.traceExecutor = new ThreadPoolExecutor(// 10, // 20, // 1000 * 60, // TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); traceProducer = getAndCreateTraceProducer(rpcHook); } public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false, true)) { traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } this.accessChannel = accessChannel; this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker.setDaemon(true); this.worker.start(); this.registerShutDownHook(); } private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) { DefaultMQProducer traceProducerInstance = this.traceProducer; if (traceProducerInstance == null) { traceProducerInstance = new DefaultMQProducer(rpcHook); traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); traceProducerInstance.setSendMsgTimeout(5000); traceProducerInstance.setVipChannelEnabled(false); // The max size of message is 128K traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000); } return traceProducerInstance; } @Override public boolean append(final Object ctx) { boolean result = traceContextQueue.offer((TraceContext) ctx); if (!result) { log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx); } return result; } @Override public void flush() throws IOException { // The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return. long end = System.currentTimeMillis() + 500; while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) { try { Thread.sleep(1); } catch (InterruptedException e) { break; } } log.info("------end trace send " + traceContextQueue.size() + " " + appenderQueue.size()); } @Override public void shutdown() { this.stopped = true; this.traceExecutor.shutdown(); if (isStarted.get()) { traceProducer.shutdown(); } this.removeShutdownHook(); } public void registerShutDownHook() { if (shutDownHook == null) { shutDownHook = new Thread(new Runnable() { private volatile boolean hasShutdown = false; @Override public void run() { synchronized (this) { if (!this.hasShutdown) { try { flush(); } catch (IOException e) { log.error("system MQTrace hook shutdown failed ,maybe loss some trace data"); } } } } }, "ShutdownHookMQTrace"); Runtime.getRuntime().addShutdownHook(shutDownHook); } } public void removeShutdownHook() { if (shutDownHook != null) { Runtime.getRuntime().removeShutdownHook(shutDownHook); } } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.javaasync
class AsyncRunnable implements Runnable { private boolean stopped; @Override public void run() { while (!stopped) { List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize); for (int i = 0; i < batchSize; i++) { TraceContext context = null; try { //get trace data element from blocking Queue — traceContextQueue context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } if (context != null) { contexts.add(context); } else { break; } } if (contexts.size() > 0) { AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); traceExecutor.submit(request); } else if (AsyncTraceDispatcher.this.stopped) { this.stopped = true; } } } }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.javaide
class AsyncAppenderRequest implements Runnable { List<TraceContext> contextList; public AsyncAppenderRequest(final List<TraceContext> contextList) { if (contextList != null) { this.contextList = contextList; } else { this.contextList = new ArrayList<TraceContext>(1); } } @Override public void run() { sendTraceData(contextList); } public void sendTraceData(List<TraceContext> contextList) { Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>(); for (TraceContext context : contextList) { if (context.getTraceBeans().isEmpty()) { continue; } // Topic value corresponding to original message entity content String topic = context.getTraceBeans().get(0).getTopic(); String regionId = context.getRegionId(); // Use original message entity's topic as key String key = topic; if (!StringUtils.isBlank(regionId)) { key = key + TraceConstants.CONTENT_SPLITOR + regionId; } List<TraceTransferBean> transBeanList = transBeanMap.get(key); if (transBeanList == null) { transBeanList = new ArrayList<TraceTransferBean>(); transBeanMap.put(key, transBeanList); } TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context); transBeanList.add(traceData); } for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) { String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR)); String dataTopic = entry.getKey(); String regionId = null; if (key.length > 1) { dataTopic = key[0]; regionId = key[1]; } flushData(entry.getValue(), dataTopic, regionId); } } /** * Batch sending data actually */ private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) { if (transBeanList.size() == 0) { return; } // Temporary buffer StringBuilder buffer = new StringBuilder(1024); int count = 0; Set<String> keySet = new HashSet<String>(); for (TraceTransferBean bean : transBeanList) { // Keyset of message trace includes msgId of or original message keySet.addAll(bean.getTransKey()); buffer.append(bean.getTransData()); count++; // Ensure that the size of the package should not exceed the upper limit. if (buffer.length() >= traceProducer.getMaxMessageSize()) { sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId); // Clear temporary buffer after finishing buffer.delete(0, buffer.length()); keySet.clear(); count = 0; } } if (count > 0) { sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId); } transBeanList.clear(); } /** * Send message trace data * * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) * @param data the message trace data in this batch */ private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) { String traceTopic = traceTopicName; if (AccessChannel.CLOUD == accessChannel) { traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId; } final Message message = new Message(traceTopic, data.getBytes()); // Keyset of message trace includes msgId of or original message message.setKeys(keySet); try { Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic); SendCallback callback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { } @Override public void onException(Throwable e) { log.info("send trace data ,the traceData is " + data); } }; if (traceBrokerSet.isEmpty()) { // No cross set traceProducer.send(message, callback, 5000); } else { traceProducer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Set<String> brokerSet = (Set<String>) arg; List<MessageQueue> filterMqs = new ArrayList<MessageQueue>(); for (MessageQueue queue : mqs) { if (brokerSet.contains(queue.getBrokerName())) { filterMqs.add(queue); } } int index = sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % filterMqs.size(); if (pos < 0) { pos = 0; } return filterMqs.get(pos); } }, traceBrokerSet, callback); } } catch (Exception e) { log.info("send trace data,the traceData is" + data); } } private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) { Set<String> brokerSet = new HashSet<String>(); TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo()); producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = producer.getTopicPublishInfoTable().get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) { brokerSet.add(queue.getBrokerName()); } } return brokerSet; } }
DefaultMQProducer的構造器在enableMsgTrace爲true時會建立AsyncTraceDispatcher,再建立SendMessageTraceHookImpl,而後執行getDefaultMQProducerImpl().registerSendMessageHookui