RocketMq源碼分析之DefaultLitePullConsumer主動拉取消息分析consumeRequestCachejava
由於作RMQ的proxy的消費者的時候,消息消費只能主動拉取,而後就想去研究下RMQ中主動拉消息的消費者的源碼,開始在網上搜到的都是講DefaultMQPullConsumer的,而後我用的是RocketMq4.8.0的版本,在4.8中,這個類被標記爲將要廢棄,將被DefaultLitePullConsumer替代,因而今天就我們就研究下DefaultLitePullConsumer.app
和普通push模式的消費者同樣是建立好對象,而後start,它的區別是須要主動去拿消息,而後去消費.源碼分析
這裏面又有兩種拉取消息的方式,一種是assign,一種是subscribe.fetch
assign是指定哪些隊列去拉取,subscribe是隻指定topic,而後由均衡策略去全部隊列中選擇隊列拉取this
這是subscribespa
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test"); litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); litePullConsumer.subscribe("TopicTest", "*"); litePullConsumer.start(); try { while (running) { List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s%n", messageExts); } } finally { litePullConsumer.shutdown(); }
這是assign線程
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); litePullConsumer.setAutoCommit(false); litePullConsumer.start(); Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest"); List<MessageQueue> list = new ArrayList<>(mqSet); List<MessageQueue> assignList = new ArrayList<>(); for (int i = 0; i < list.size() / 2; i++) { assignList.add(list.get(i)); } litePullConsumer.assign(assignList); litePullConsumer.seek(assignList.get(0), 10); try { while (running) { List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s %n", messageExts); litePullConsumer.commitSync(); } } finally { litePullConsumer.shutdown(); }
首先看start方法幹了什麼code
public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; //這裏是對consumerGroupName以及messageModel等一些基礎屬性作一下校驗 this.checkConfig(); //若是是集羣模式,就將實例名改成PID,由於通常沒設置instanceName的狀況下,默認是Default, //可是集羣下,不一樣的consumer節點須要區分開來,不能全設置爲Default if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultLitePullConsumer.changeInstanceNameToPID(); } //初始化客戶端實例,註冊爲消費者 initMQClientFactory(); //初始化消息的重平衡策略的配置, initRebalanceImpl(); initPullAPIWrapper(); //根據消息的消費模式,啓動不一樣的offsetStore對象 //集羣模式就啓動集羣模式的 //廣播模式啓動廣播模式的 initOffsetStore(); mQClientFactory.start(); //週期性獲取最新的Topic和與之對應的MessageQueue startScheduleTask(); this.serviceState = ServiceState.RUNNING; log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup()); //根據拉取消息的策略來執行,有兩種,一種是訂閱Topic,一種是本身分配messageQueue,也就是assign //若是是assign,須要在start consumer以前,本身先根據topic拿到全部的messageQueue的信息 //而後assign本身想要選擇的messageQueue //若是是subscribe,就會自動根據傳入的topic去namesrv拉取全部messageQueue的數據 operateAfterRunning(); break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } }
第一遍看源碼的時候,就這麼大體過了一遍,而後接下來就是主動拉取消息的時候了:對象
List<MessageExt> messageExts = litePullConsumer.poll();
在最上面啓動消費者的代碼中有這麼一行,是主動拉取消息的方法,在之前的DefaultMQPullConsumer中,是能夠設置拉取多少條消息的,可是如今這個類不能設置了,我點進poll方法開始追蹤消息的拉取策略隊列
public synchronized List<MessageExt> poll(long timeout) { try { checkServiceState(); if (timeout < 0) throw new IllegalArgumentException("Timeout must not be negative"); //看是否設置了自動提交offset,若是是就根據當前時間判斷一下當前是否須要提交一次offset if (defaultLitePullConsumer.isAutoCommit()) { maybeAutoCommit(); } long endTime = System.currentTimeMillis() + timeout; //獲取拉取到的消息 ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (endTime - System.currentTimeMillis() > 0) { while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (endTime - System.currentTimeMillis() <= 0) break; } } if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) { List<MessageExt> messages = consumeRequest.getMessageExts(); long offset = consumeRequest.getProcessQueue().removeMessage(messages); assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); //If namespace not null , reset Topic without namespace. this.resetTopic(messages); return messages; } } catch (InterruptedException ignore) { } return Collections.emptyList(); }
看到這裏我就發現不對勁了,ConsumeRequest是一個保存了消息數據的一個類,這個類竟然是從consumeRequestCache這麼一個BlockingQueue中取出來的
private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
等等,不對勁啊,怎麼直接就從程序的隊列中取出來了,這啥時候放進去的啊,我都還沒開始拉取,怎麼就有消息了呢,因而我就追蹤了一下這個consumeRequestCache,發現他裏面的數據是經過PullTaskImpl這個線程類在啓動的時候放進去的,而後就發現了一個存放PullTaskImpl的隊列
private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable = new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
最終追蹤到了一個startPullTask方法,在這個方法須要傳入一個集合的messageQueue,而後建立一個PullTaskImpl開始拉取消息,生成ConsumeRequest. 繼續追蹤發現有兩個地方使用到了這個startPullTask方法,一個是updateAssignPullTask,一個是updatePullTask,而後我一路追蹤這兩個方法的用途. 發現了updateAssignPullTask是在上面start方法中的operateAfterRunning()使用的,這下對應上了,在assign模式下,傳入了assign的messageQueues而後在start以後從這些queue拉取消息.
另外一個方法是在觸發消息分發重平衡的時候執行的.在RebalanceLitePullImpl中
這下主動拉取消息的邏輯明白了,原來是在consumer啓動的時候,若是指定了messageQueue,就開始從這些queue中拉取消息,若是沒有指定,那就在重平衡的時候從訂閱的Topic里拉取全部的messageQueue,而後再拉取消息,以後就經過poll方法來取.
對了還有很是重要的一點,主動拉取消息的狀況,是默認自動提交消費位點offset的,能夠經過setAutoCommit(false);來設置爲手動提交.消費完了記得調用commitSync();