本文主要研究一下rocketmq的getOrCreateMQClientInstancejava
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/MQClientManager.javagit
public class MQClientManager { private final static InternalLogger log = ClientLogger.getLog(); private static MQClientManager instance = new MQClientManager(); private AtomicInteger factoryIndexGenerator = new AtomicInteger(); private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>(); private MQClientManager() { } public static MQClientManager getInstance() { return instance; } public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) { return getOrCreateMQClientInstance(clientConfig, null); } public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; } public void removeClientFactory(final String clientId) { this.factoryTable.remove(clientId); } }
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/ClientConfig.javagithub
public class ClientConfig { public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel"; private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses(); private String clientIP = RemotingUtil.getLocalAddress(); private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); protected String namespace; protected AccessChannel accessChannel = AccessChannel.LOCAL; /** * Pulling topic information interval from the named server */ private int pollNameServerInterval = 1000 * 30; /** * Heartbeat interval in microseconds with message broker */ private int heartbeatBrokerInterval = 1000 * 30; /** * Offset persistent interval for consumer */ private int persistConsumerOffsetInterval = 1000 * 5; private long pullTimeDelayMillsWhenException = 1000; private boolean unitMode = false; private String unitName; private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false")); private boolean useTLS = TlsSystemConfig.tlsEnable; private LanguageCode language = LanguageCode.JAVA; public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); } return sb.toString(); } public String getClientIP() { return clientIP; } public String getInstanceName() { return instanceName; } public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = String.valueOf(UtilAll.getPid()); } } //...... }
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.javaspring
public class DefaultMQProducerImpl implements MQProducerInner { //...... public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { RequestFutureTable.scanExpiredRequest(); } catch (Throwable e) { log.error("scan RequestFutureTable exception", e); } } }, 1000 * 3, 1000); } //...... }
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.javaapache
public class DefaultMQPushConsumerImpl implements MQConsumerInner { //...... public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); } //...... }
rocketmq-spring-boot/2.0.4/rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/RocketMQUtil.javasession
public class RocketMQUtil { //...... public static String getInstanceName(RPCHook rpcHook, String identify) { String separator = "|"; StringBuilder instanceName = new StringBuilder(); SessionCredentials sessionCredentials = ((AclClientRPCHook)rpcHook).getSessionCredentials(); instanceName.append(sessionCredentials.getAccessKey()) .append(separator).append(sessionCredentials.getSecretKey()) .append(separator).append(identify) .append(separator).append(UtilAll.getPid()); return instanceName.toString(); } //...... }
rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.javaapp
public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware { private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class); private ApplicationContext applicationContext; //...... private void initRocketMQPushConsumer() throws MQClientException { Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); Assert.notNull(nameServer, "Property 'nameServer' is required"); Assert.notNull(topic, "Property 'topic' is required"); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace(); if (Objects.nonNull(rpcHook)) { consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false); consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup)); } else { log.debug("Access-key or secret-key not configure in " + this + "."); consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer()); if (customizedNameServer != null) { consumer.setNamesrvAddr(customizedNameServer); } else { consumer.setNamesrvAddr(nameServer); } if (accessChannel != null) { consumer.setAccessChannel(accessChannel); } consumer.setConsumeThreadMax(consumeThreadMax); if (consumeThreadMax < consumer.getConsumeThreadMin()) { consumer.setConsumeThreadMin(consumeThreadMax); } consumer.setConsumeTimeout(consumeTimeout); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } } //...... }
rocketmq-spring-boot/2.0.4/rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.javaide
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean { private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class); private DefaultMQProducer producer; //...... private TransactionMQProducer createTransactionMQProducer(String name, RocketMQLocalTransactionListener transactionListener, ExecutorService executorService, RPCHook rpcHook) { Assert.notNull(producer, "Property 'producer' is required"); Assert.notNull(transactionListener, "Parameter 'transactionListener' is required"); TransactionMQProducer txProducer; if (Objects.nonNull(rpcHook)) { txProducer = new TransactionMQProducer(name, rpcHook); txProducer.setVipChannelEnabled(false); txProducer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, name)); } else { txProducer = new TransactionMQProducer(name); } txProducer.setTransactionListener(RocketMQUtil.convert(transactionListener)); txProducer.setNamespace(producer.getNamespace()); txProducer.setNamesrvAddr(producer.getNamesrvAddr()); if (executorService != null) { txProducer.setExecutorService(executorService); } txProducer.setSendMsgTimeout(producer.getSendMsgTimeout()); txProducer.setRetryTimesWhenSendFailed(producer.getRetryTimesWhenSendFailed()); txProducer.setRetryTimesWhenSendAsyncFailed(producer.getRetryTimesWhenSendAsyncFailed()); txProducer.setMaxMessageSize(producer.getMaxMessageSize()); txProducer.setCompressMsgBodyOverHowmuch(producer.getCompressMsgBodyOverHowmuch()); txProducer.setRetryAnotherBrokerWhenNotStoreOK(producer.isRetryAnotherBrokerWhenNotStoreOK()); return txProducer; } //...... }