本文主要研究一下rocketmq的AccessChanneljava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/AccessChannel.javagit
public enum AccessChannel {
/**
* Means connect to private IDC cluster.
*/
LOCAL,
/**
* Means connect to Cloud service.
*/
CLOUD,
}
複製代碼
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.javagithub
public interface TraceDispatcher {
/**
* Initialize asynchronous transfer data module
*/
void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
/**
* Append the transfering data
* @param ctx data infomation
* @return
*/
boolean append(Object ctx);
/**
* Write flush action
*
* @throws IOException
*/
void flush() throws IOException;
/**
* Close the trace Hook
*/
void shutdown();
}
複製代碼
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.javaapache
public class AsyncTraceDispatcher implements TraceDispatcher {
private final static InternalLogger log = ClientLogger.getLog();
private final int queueSize;
private final int batchSize;
private final int maxMsgSize;
private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecutor;
// The last discard number of log
private AtomicLong discardCount;
private Thread worker;
private ArrayBlockingQueue<TraceContext> traceContextQueue;
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
private AccessChannel accessChannel = AccessChannel.LOCAL;
//......
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
//......
class AsyncAppenderRequest implements Runnable {
List<TraceContext> contextList;
public AsyncAppenderRequest(final List<TraceContext> contextList) {
if (contextList != null) {
this.contextList = contextList;
} else {
this.contextList = new ArrayList<TraceContext>(1);
}
}
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
String traceTopic = traceTopicName;
if (AccessChannel.CLOUD == accessChannel) {
traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
}
final Message message = new Message(traceTopic, data.getBytes());
// Keyset of message trace includes msgId of or original message
message.setKeys(keySet);
//......
}
//......
}
//......
}
複製代碼
AccessChannel定義了兩個枚舉值,分別是LOCAL及CLOUD;TraceDispatcher的start方法會接收AccessChannel類型的參數;AsyncTraceDispatcher內部類AsyncAppenderRequest的sendTraceDataByMQ方法,針對accessChannel爲AccessChannel.CLOUD類型的,會給TraceConstants.TRACE_TOPIC_PREFIX加上regionId做爲traceTopicbash