consumer的常規訂閱消息的整體操做流程:api
構造初始化-》註冊監聽-》啓動-》無限循環請求隊列-》長鏈接的數據拉取安全
一,構造初始化
DefaultMQPushConsumer:常規的訂閱消息,須要制定惟一的分組名稱
最終構造的對象是DefaultMQPushConsumerImpl:
核心的功能實現,整合內部多個組件
配置消息的消息開始位置:ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
配置namesrv地址:拉取對應的配置內容和關係結構
訂閱內容:基於特定topic的訂閱,而後內置了表達式引擎(過濾內容)
註冊監聽:該監聽是監聽到有消息時的自有業務邏輯處理網絡
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wang-group-consmuer"); /* * Specify where to start in case the specified consumer group is a brand new one. */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //指定nameserver的地址,全部的數據交互都是基於nameserve來進行的信息獲取和更新及心跳 consumer.setNamesrvAddr("127.0.0.1:9876"); /* * Subscribe one more more topics to consume. */ consumer.subscribe("TopicTest", "*"); /* * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the consumer instance. */ consumer.start();
構造函數的初始化數據結構
public DefaultMQPushConsumer(final String consumerGroup) { this(consumerGroup, null, new AllocateMessageQueueAveragely()); }
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); }
實際實現服務的核心屬性併發
public class DefaultMQPushConsumerImpl implements MQConsumerInner { /** * Delay some time when exception occur */ private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000; /** * Flow control interval */ private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; /** * Delay some time when suspend pull service */ private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000; private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15; private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30; private final InternalLogger log = ClientLogger.getLog(); private final DefaultMQPushConsumer defaultMQPushConsumer; //負載的選擇器 private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); private final long consumerStartTimestamp = System.currentTimeMillis(); private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); private final RPCHook rpcHook; private volatile ServiceState serviceState = ServiceState.CREATE_JUST; //客戶端實例,複用 private MQClientInstance mQClientFactory; //拉取服務請求的包裝 private PullAPIWrapper pullAPIWrapper; private volatile boolean pause = false; private boolean consumeOrderly = false; //消息監聽 private MessageListener messageListenerInner; //消息的偏移位置 private OffsetStore offsetStore; //客戶端服務 private ConsumeMessageService consumeMessageService; private long queueFlowControlTimes = 0; private long queueMaxSpanFlowControlTimes = 0;
二,app
啓動
start:最終的核心啓動是DefaultMQPushConsumerImpl的啓動
啓動和消息發送端的啓動相似,進行對應的初始化及啓動操做
配置服務的執行狀態,內部拆分了四個狀態機制
驗證關鍵配置,主要是影響到和新功能的配置的內容
將配置的訂閱信息拷貝到對應的監聽,對應的負載,消費等數據對象中
生成實例id
得到MQClient的實例工程,和producer一致,關鍵的核心操做實現,綜合體
將核心的配置注入到負載服務中
包裝拉取服務,包裝成獨立的綜合體
得到消息的開始消費偏移位置,用於消息拉取的請求參數
根據消息的監聽類型,設置,分爲兩類,順序監聽和併發監聽,併發監聽的效率更高
註冊消費處理
實例MQClient的工程方法啓動
標示爲當前服務爲運行狀態負載均衡
更新訂閱的topic信息
檢查使用的broker信息
發送心跳內容給全部的broker
立刻調用負載服務平衡框架
public void start() throws MQClientException { //消費端的核心啓動入口 this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
核心功能的操做啓動ide
//消費端主動推送的啓動核心入口 public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; //驗證核心關鍵配置 this.checkConfig(); //將配置中的訂閱信息拷貝到推送服務中 this.copySubscription(); //生成id if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } //得到客戶端工廠,共享的客戶端工程單例設計 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); //將配置中的核心配置賦值給負載中 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); //拉取消息的包裝api this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); //設置消息的開始位置 if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } //加載消息讀取位置信息 this.offsetStore.load(); //消息接受器的監聽類型 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } //消費者監聽服務啓動 this.consumeMessageService.start(); //註冊服務處理事件 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } //連接工程的啓動 mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } //更新topic信息到當前的內存數據結構中,便於後期的直接使用 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); //檢查須要連接的broker是否可用 this.mQClientFactory.checkClientInBroker(); //發送心跳數據給當前服務所連接的全部broker中,操做過程是基於安全鎖機制 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); //喚醒負載平衡操做 this.mQClientFactory.rebalanceImmediately(); }
三,監聽
業務根據RocketMQ的規範,根據業務特定實現的接口
接口分爲兩類,順序消息,並行消費函數
//消息接受器的監聽類型 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = //有序消息 new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = //並行消息 new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); }
並行消息
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { private static final InternalLogger log = ClientLogger.getLog(); //消息的實際拉取操做 private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; //消費消息的頂層結構 private final DefaultMQPushConsumer defaultMQPushConsumer; //並行消息接聽 private final MessageListenerConcurrently messageListener; private final BlockingQueue<Runnable> consumeRequestQueue; //線程池執行配置 private final ThreadPoolExecutor consumeExecutor; private final String consumerGroup; //任務調用 private final ScheduledExecutorService scheduledExecutorService; //任務調度清除超時 private final ScheduledExecutorService cleanExpireMsgExecutors; public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); } public void start() { this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { //清除超時消息 cleanExpireMsg(); } }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); }
四,長鏈接的拉取
啓動入口在MQClient的連接工程的啓動中
在啓動中指定負載服務的啓動中RebalanceService
在操做中this.waitForRunning(waitInterval);默認是阻塞20000,可是在服務啓動後,執行了this.mQClientFactory.rebalanceImmediately();
核心操做在重置負載中this.mqClientFactory.doRebalance();
有一個前提,根據當前服務的模式,分爲push自動領取,pull程序主動發起,還有一個條件就是是不是順序消息
遍歷訂閱信息,從新負載topic信息
最核心的操做在更新queue到負載中,完成了初始化
此時調用的是PullMessageService的執行拉取操做,該對象是核心
後期的操做就是自動化請求,基於隊列的形式,持續的請求,完成後再放置下一次請求到隊列中,循環請求
//連接工程的啓動 mQClientFactory.start();
//初始化客戶端請求實例 public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) { //mq的核心配置信息 this.clientConfig = clientConfig; //當前進程內的惟一標識,升序數值 this.instanceIndex = instanceIndex; //netty通訊的客戶端配置 this.nettyClientConfig = new NettyClientConfig(); this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads()); this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS()); //解析客戶端請求,封裝的事件處理 this.clientRemotingProcessor = new ClientRemotingProcessor(this); //客戶端實例的實際實現,網絡通訊的核心,只是初始化了通訊框架,具體的連接後面根據不一樣的地址再進行連接操做 this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig); //設置核心的nameserv地址 if (this.clientConfig.getNamesrvAddr() != null) { this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr()); log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr()); } this.clientId = clientId; //mq管理 this.mQAdminImpl = new MQAdminImpl(this); //拉取消息的實現 this.pullMessageService = new PullMessageService(this); //負載均衡的實現,可能有相關的機器增長刪除,須要按期的進行重負載操做 this.rebalanceService = new RebalanceService(this); //消息發送者的包裝,發送者的發送者,這個邏輯有點亂,而且在構造方法中從新初始化的 //producer -> DefaultMQProducer -> DefaultMQProducerImpl -> MQClientInstance -> DefaultMQProducer this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP); this.defaultMQProducer.resetClientConfig(clientConfig); //消費客戶端的狀態管理 this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}", this.instanceIndex, this.clientId, this.clientConfig, MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer()); } //啓動發送消息的核心,同時也是訂閱消息的核心 public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } //啓動netty的客戶端配置 // Start request-response channel this.mQClientAPIImpl.start(); //啓動任務,更新,驗證,心跳等操做 // Start various schedule tasks this.startScheduledTask(); //消費端 // Start pull service this.pullMessageService.start(); //消費端,從新負載設置,請求的初始化操做也在此方法內執行 // Start rebalance service this.rebalanceService.start(); //前面已經初始化過操做,該入參爲false,只須要初始化其餘操做 // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
初始化操做類RebalanceService
public class RebalanceService extends ServiceThread { private static long waitInterval = Long.parseLong(System.getProperty( "rocketmq.client.rebalance.waitInterval", "20000")); private final InternalLogger log = ClientLogger.getLog(); //消息的網絡操做及功能 private final MQClientInstance mqClientFactory; //負載服務的初始化,構造是鏈接服務的實例,當前是線程的子類 public RebalanceService(MQClientInstance mqClientFactory) { this.mqClientFactory = mqClientFactory; } @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { //等待執行 this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return RebalanceService.class.getSimpleName(); } }
表現的狀態是在waiting,而後再指定負載,執行的負載操做是
//負載平衡重置 public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }
此時consumerTable中的數據在前面的初始化啓動中進行了註冊操做
//註冊服務處理事件 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); }
執行平臺操做
@Override public void doRebalance() { if (!this.pause) { this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); } }
public void doRebalance(final boolean isOrder) { //得到訂閱的內容 Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { //操做對topic的負載 this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } //基於數據的統一處理 this.truncateMessageQueueNotMyTopic(); }
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } //默認的集羣模式 case CLUSTERING: { //得到當前topic的訂閱隊列信息 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //請求得到topic的cid請求 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); //排序操做 Collections.sort(mqAll); Collections.sort(cidAll); //選擇cid的策略 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } //是否有變化 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); //消息隊列的修改 this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }
主要的操做在修改處理消息隊列的變化boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false; //得到處理隊列的數據 Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); //遍歷隊列同時對消息對鞋進行修正 while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); //針對相同的topic的處理 if (mq.getTopic().equals(topic)) { if (!mqSet.contains(mq)) { pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); } } else if (pq.isPullExpired()) { switch (this.consumeType()) { case CONSUME_ACTIVELY: break; case CONSUME_PASSIVELY: pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", consumerGroup, mq); } break; default: break; } } } } //構造拉取消息的請求 List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); //封裝拉取的請求結構 PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } //處理消息拉取請求 this.dispatchPullRequest(pullRequestList); return changed; }
拉取的操做this.dispatchPullRequest(pullRequestList);是抽象的設計,須要根據當前的實現類進行實現
@Override public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { //實行消息的拉取操做 this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }
逐層調用實現
public void executePullRequestImmediately(final PullRequest pullRequest) { this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); }
//將請求直接添加到隊列中,這裏確定是第一次初始化和後來無限次操做的入口 public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
將最終的拉取請求添加的請求隊列中,等待請求隊列的掃描和執行
最終的執行開始及操做,在啓動的最後一步執行this.mQClientFactory.rebalanceImmediately();
//喚醒負載 public void rebalanceImmediately() { this.rebalanceService.wakeup(); }
public void wakeup() { if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } }
該喚醒操做和簽名的waiting操做是一一對應的操做,待服務徹底啓動後,執行拉取的喚醒操做