說在前面java
接上篇express
源碼解析apache
進入這個方法,獲取並建立channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient#getAndCreateChannelbootstrap
private Channel getAndCreateChannel(final String addr) throws InterruptedException {if (null == addr) {// 獲取和namesrv通訊的channel =》return getAndCreateNameserverChannel();}ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}// 建立channel=》return this.createChannel(addr); }
進入這個方法,獲取和namesrv通訊的channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient#getAndCreateNameserverChannelc#
private Channel getAndCreateNameserverChannel() throws InterruptedException {String addr = this.namesrvAddrChoosed.get();if (addr != null) {ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}}// 從namesrvAddrChoosed中查找namesrv,若是不存在同步輪詢的方式從namesrvAddrList中取final List<String> addrList = this.namesrvAddrList.get();if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {addr = this.namesrvAddrChoosed.get();if (addr != null) {ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}}if (addrList != null && !addrList.isEmpty()) {for (int i = 0; i < addrList.size(); i++) {int index = this.namesrvIndex.incrementAndGet();index = Math.abs(index);index = index % addrList.size();String newAddr = addrList.get(index);this.namesrvAddrChoosed.set(newAddr);log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);// 同步建立渠道=》Channel channelNew = this.createChannel(newAddr);if (channelNew != null) {return channelNew;}}}} catch (Exception e) {log.error("getAndCreateNameserverChannel: create name server channel exception", e);} finally {this.lockNamesrvChannel.unlock();}} else {log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);}return null; }
進入這個方法,同步建立渠道,org.apache.rocketmq.remoting.netty.NettyRemotingClient#createChannel緩存
private Channel createChannel(final String addr) throws InterruptedException {ChannelWrapper cw = this.channelTables.get(addr);// 代碼走到這裏,這裏的邏輯正常狀況下是走不到的,爲了代碼嚴謹性if (cw != null && cw.isOK()) {cw.getChannel().close();channelTables.remove(addr);}if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {boolean createNewConnection;// 爲了代碼嚴謹性,這裏又作了一次判斷cw = this.channelTables.get(addr);if (cw != null) {if (cw.isOK()) {cw.getChannel().close();this.channelTables.remove(addr);createNewConnection = true;// 若是channel還在用,不讓建立} else if (!cw.getChannelFuture().isDone()) {createNewConnection = false;} else {this.channelTables.remove(addr);createNewConnection = true;}} else {createNewConnection = true;}if (createNewConnection) {// 從新創建鏈接ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);cw = new ChannelWrapper(channelFuture);// 創建的channel也放到本次緩存中this.channelTables.put(addr, cw);}} catch (Exception e) {log.error("createChannel: create channel exception", e);} finally {this.lockChannelTables.unlock();}} else {log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);}if (cw != null) {ChannelFuture channelFuture = cw.getChannelFuture();if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {// 對channel再次判斷if (cw.isOK()) {log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());return cw.getChannel();} else {log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());}} else {log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),channelFuture.toString());}}return null; }
往上返回到這個方法,根據地址建立channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient#createChannel,這個方法上面介紹過了。微信
進入這個方法,執行同步請求,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImplapp
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque();try {final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);// 緩存正在進行的響應this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}responseTable.remove(opaque);responseFuture.setCause(f.cause());// 響應解析完畢會解除countDownLatch的阻塞 =》responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});// 這裏用countDownLatch實現的阻塞 =》RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}return responseCommand;} finally {this.responseTable.remove(opaque);} }
往上返回到這個方法,更新訂閱的topic配置,org.apache.rocketmq.broker.topic.TopicConfigManager#updateOrderTopicConfigasync
public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {boolean isChange = false;Set<String> orderTopics = orderKVTableFromNs.getTable().keySet();for (String topic : orderTopics) {TopicConfig topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null && !topicConfig.isOrder()) {topicConfig.setOrder(true);isChange = true;log.info("update order topic config, topic={}, order={}", topic, true);}}for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {String topic = entry.getKey();if (!orderTopics.contains(topic)) {TopicConfig topicConfig = entry.getValue();if (topicConfig.isOrder()) {topicConfig.setOrder(false);isChange = true;log.info("update order topic config, topic={}, order={}", topic, false);}}}if (isChange) {// 更新數據版本號this.dataVersion.nextVersion();// 持久化=》this.persist();}} }
往上返回到這個方法,註冊消費者,org.apache.rocketmq.broker.client.ConsumerManager#registerConsumeride
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {// 獲取消費組信息ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null == consumerGroupInfo) {ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);consumerGroupInfo = prev != null ? prev : tmp;}// 更新channel信息=》boolean r1 =consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);// 更新訂閱細信息=》boolean r2 = consumerGroupInfo.updateSubscription(subList);if (r1 || r2) {if (isNotifyConsumerIdsChangedEnable) {// 通知消費組的全部消費者channel和訂閱信息改變了=》this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);return r1 || r2; }
進入這個方法,更新channel信息,org.apache.rocketmq.broker.client.ConsumerGroupInfo#updateChannel
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {boolean updated = false;this.consumeType = consumeType;this.messageModel = messageModel;this.consumeFromWhere = consumeFromWhere;// 獲取channel信息ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());if (null == infoOld) {ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);if (null == prev) {log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,messageModel, infoNew.toString());updated = true;}infoOld = infoNew;} else {if (!infoOld.getClientId().equals(infoNew.getClientId())) {log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",this.groupName,infoOld.toString(),infoNew.toString());this.channelInfoTable.put(infoNew.getChannel(), infoNew);}}this.lastUpdateTimestamp = System.currentTimeMillis();infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);return updated; }
進入這個方法,更新訂閱細信息,org.apache.rocketmq.broker.client.ConsumerGroupInfo#updateSubscription
public boolean updateSubscription(final Set<SubscriptionData> subList) {boolean updated = false;for (SubscriptionData sub : subList) {// 獲取以前的topic訂閱信息SubscriptionData old = this.subscriptionTable.get(sub.getTopic());if (old == null) {SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);if (null == prev) {updated = true;log.info("subscription changed, add new topic, group: {} {}",this.groupName,sub.toString());}} else if (sub.getSubVersion() > old.getSubVersion()) {if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {log.info("subscription changed, group: {} OLD: {} NEW: {}",this.groupName,old.toString(),sub.toString());}this.subscriptionTable.put(sub.getTopic(), sub);}}Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();while (it.hasNext()) {Entry<String, SubscriptionData> next = it.next();String oldTopic = next.getKey();boolean exist = false;for (SubscriptionData sub : subList) {if (sub.getTopic().equals(oldTopic)) {exist = true;break;}}if (!exist) {log.warn("subscription changed, group: {} remove topic {} {}",this.groupName,oldTopic,next.getValue().toString());// 若是已存在以前的topic刪除it.remove();updated = true;}}this.lastUpdateTimestamp = System.currentTimeMillis();return updated; }
往上返回到這個方法,通知消費組的全部消費者channel和訂閱信息改變了,org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener#handle
@Overridepublic void handle(ConsumerGroupEvent event, String group, Object... args) {if (event == null) {return;}switch (event) {case CHANGE:if (args == null || args.length < 1) {return;}List<Channel> channels = (List<Channel>) args[0];if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {for (Channel chl : channels) {// broker按channel、消費組通知client=》this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);}}break;case UNREGISTER:// 按組取消註冊=》this.brokerController.getConsumerFilterManager().unRegister(group);break;case REGISTER:if (args == null || args.length < 1) {return;}Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];// 按組註冊訂閱信息=》this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);break;default:throw new RuntimeException("Unknown event " + event);} }
進入這個方法, broker按channel、消費組通知client,org.apache.rocketmq.broker.client.net.Broker2Client#notifyConsumerIdsChanged
public void notifyConsumerIdsChanged(final Channel channel,final String consumerGroup) {if (null == consumerGroup) {log.error("notifyConsumerIdsChanged consumerGroup is null");return;}NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();requestHeader.setConsumerGroup(consumerGroup);RemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);try {// 單向通知=》this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);} catch (Exception e) {log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());} }
進入這個方法,單向通知,org.apache.rocketmq.remoting.netty.NettyRemotingServer#invokeOneway,這個方法前面介紹過了。
進入這個方法,按組取消註冊,org.apache.rocketmq.broker.filter.ConsumerFilterManager#unRegister
public void unRegister(final String consumerGroup) {for (String topic : filterDataByTopic.keySet()) {// 取消註冊=》this.filterDataByTopic.get(topic).unRegister(consumerGroup);} }
進入這個方法,取消註冊,org.apache.rocketmq.broker.filter.ConsumerFilterManager.FilterDataMapByTopic#unRegister
public void unRegister(String consumerGroup) {if (!this.groupFilterData.containsKey(consumerGroup)) {return;}// 獲取消費者過濾信息ConsumerFilterData data = this.groupFilterData.get(consumerGroup);if (data == null || data.isDead()) {return;}long now = System.currentTimeMillis();log.info("Unregister consumer filter: {}, deadTime: {}", data, now);data.setDeadTime(now); }
往上返回到這個方法,按組註冊訂閱信息,org.apache.rocketmq.broker.filter.ConsumerFilterManager#register(java.lang.String, java.util.Collection<org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData>)
public void register(final String consumerGroup, final Collection<SubscriptionData> subList) {for (SubscriptionData subscriptionData : subList) {// 註冊訂閱信息=》register(subscriptionData.getTopic(),consumerGroup,subscriptionData.getSubString(),subscriptionData.getExpressionType(),subscriptionData.getSubVersion());}// make illegal topic dead. 按組獲取消費者過濾數據=》Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup);Iterator<ConsumerFilterData> iterator = groupFilterData.iterator();while (iterator.hasNext()) {ConsumerFilterData filterData = iterator.next();boolean exist = false;for (SubscriptionData subscriptionData : subList) {if (subscriptionData.getTopic().equals(filterData.getTopic())) {exist = true;break;}}if (!exist && !filterData.isDead()) {// 若是要更新的訂閱數據不存在,以前的過濾數據失效filterData.setDeadTime(System.currentTimeMillis());log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);}} }
進入這個方法,註冊訂閱信息,org.apache.rocketmq.broker.filter.ConsumerFilterManager#register(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long)
public boolean register(final String topic, final String consumerGroup, final String expression,final String type, final long clientVersion) {if (ExpressionType.isTagType(type)) {return false;}if (expression == null || expression.length() == 0) {return false;}// 獲取topic的過濾數據信息FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);if (filterDataMapByTopic == null) {FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);filterDataMapByTopic = prev != null ? prev : temp;}BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);// 註冊過濾消息=》return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion); }
進入這個方法,註冊過濾消息,org.apache.rocketmq.broker.filter.ConsumerFilterManager.FilterDataMapByTopic#register
public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData,long clientVersion) {// 獲取以前的消費者過濾數據ConsumerFilterData old = this.groupFilterData.get(consumerGroup);if (old == null) {// 構建消費者過濾數據=》ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {return false;}consumerFilterData.setBloomFilterData(bloomFilterData);old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);if (old == null) {log.info("New consumer filter registered: {}", consumerFilterData);return true;} else {// 若是當前的client版本小於等於老的client版本if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);}// 若是數據同樣if (clientVersion == old.getClientVersion() && old.isDead()) {// 設置取消註冊的數據註冊reAlive(old);return true;}return false;} else {this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);return true;}}} else {if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);}if (clientVersion == old.getClientVersion() && old.isDead()) {reAlive(old);return true;}return false;}boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);if (old.getBloomFilterData() == null && bloomFilterData != null) {change = true;}if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {change = true;}// if subscribe data is changed, or consumer is died too long.若是訂閱數據改變了if (change) {// 構建消費者過濾數據ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {// new expression compile error, remove old, let client report error.構建訂閱數據報錯,刪除老的訂閱數據this.groupFilterData.remove(consumerGroup);return false;}consumerFilterData.setBloomFilterData(bloomFilterData);this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("Consumer filter info change, old: {}, new: {}, change: {}",old, consumerFilterData, change);return true;} else {old.setClientVersion(clientVersion);if (old.isDead()) {reAlive(old);}return true;}} }
往上返回到這個方法,註冊生產者,org.apache.rocketmq.broker.client.ProducerManager#registerProducer
public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {try {ClientChannelInfo clientChannelInfoFound = null;if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {// 獲取消費組的channel信息HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);if (null == channelTable) {channelTable = new HashMap<>();this.groupChannelTable.put(group, channelTable);}clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());if (null == clientChannelInfoFound) {channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);log.info("new producer connected, group: {} channel: {}", group,clientChannelInfo.toString());}} finally {this.groupChannelLock.unlock();}if (clientChannelInfoFound != null) {clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());}} else {log.warn("ProducerManager registerProducer lock timeout");}} catch (InterruptedException e) {log.error("", e);} }
往上返回到這個方法,org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat結束。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣