本文主要研究一下rocketmq的AccessChanneljava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/AccessChannel.javagit
public enum AccessChannel { /** * Means connect to private IDC cluster. */ LOCAL, /** * Means connect to Cloud service. */ CLOUD, }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.javagithub
public interface TraceDispatcher { /** * Initialize asynchronous transfer data module */ void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException; /** * Append the transfering data * @param ctx data infomation * @return */ boolean append(Object ctx); /** * Write flush action * * @throws IOException */ void flush() throws IOException; /** * Close the trace Hook */ void shutdown(); }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.javaapache
public class AsyncTraceDispatcher implements TraceDispatcher { private final static InternalLogger log = ClientLogger.getLog(); private final int queueSize; private final int batchSize; private final int maxMsgSize; private final DefaultMQProducer traceProducer; private final ThreadPoolExecutor traceExecutor; // The last discard number of log private AtomicLong discardCount; private Thread worker; private ArrayBlockingQueue<TraceContext> traceContextQueue; private ArrayBlockingQueue<Runnable> appenderQueue; private volatile Thread shutDownHook; private volatile boolean stopped = false; private DefaultMQProducerImpl hostProducer; private DefaultMQPushConsumerImpl hostConsumer; private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; private AtomicBoolean isStarted = new AtomicBoolean(false); private AccessChannel accessChannel = AccessChannel.LOCAL; //...... public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false, true)) { traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } this.accessChannel = accessChannel; this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker.setDaemon(true); this.worker.start(); this.registerShutDownHook(); } //...... class AsyncAppenderRequest implements Runnable { List<TraceContext> contextList; public AsyncAppenderRequest(final List<TraceContext> contextList) { if (contextList != null) { this.contextList = contextList; } else { this.contextList = new ArrayList<TraceContext>(1); } } private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) { String traceTopic = traceTopicName; if (AccessChannel.CLOUD == accessChannel) { traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId; } final Message message = new Message(traceTopic, data.getBytes()); // Keyset of message trace includes msgId of or original message message.setKeys(keySet); //...... } //...... } //...... }
AccessChannel定義了兩個枚舉值,分別是LOCAL及CLOUD;TraceDispatcher的start方法會接收AccessChannel類型的參數;AsyncTraceDispatcher內部類AsyncAppenderRequest的sendTraceDataByMQ方法,針對accessChannel爲AccessChannel.CLOUD類型的,會給TraceConstants.TRACE_TOPIC_PREFIX加上regionId做爲traceTopicapp