RocketMq源碼分析之DefaultLitePullConsumer主動拉取消息分析consumeRequestCache

RocketMq源碼分析之DefaultLitePullConsumer主動拉取消息分析consumeRequestCachejava

由於作RMQ的proxy的消費者的時候,消息消費只能主動拉取,而後就想去研究下RMQ中主動拉消息的消費者的源碼,開始在網上搜到的都是講DefaultMQPullConsumer的,而後我用的是RocketMq4.8.0的版本,在4.8中,這個類被標記爲將要廢棄,將被DefaultLitePullConsumer替代,因而今天就我們就研究下DefaultLitePullConsumer.app

和普通push模式的消費者同樣是建立好對象,而後start,它的區別是須要主動去拿消息,而後去消費.源碼分析

這裏面又有兩種拉取消息的方式,一種是assign,一種是subscribe.fetch

assign是指定哪些隊列去拉取,subscribe是隻指定topic,而後由均衡策略去全部隊列中選擇隊列拉取this

這是subscribespa

DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);

            }
        } finally {
            litePullConsumer.shutdown();
        }

這是assign線程

DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
        litePullConsumer.setAutoCommit(false);
        litePullConsumer.start();
        Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
        List<MessageQueue> list = new ArrayList<>(mqSet);
        List<MessageQueue> assignList = new ArrayList<>();
        for (int i = 0; i < list.size() / 2; i++) {
            assignList.add(list.get(i));
        }
        litePullConsumer.assign(assignList);
        litePullConsumer.seek(assignList.get(0), 10);
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s %n", messageExts);
                litePullConsumer.commitSync();
            }
        } finally {
            litePullConsumer.shutdown();
        }

首先看start方法幹了什麼code

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                //這裏是對consumerGroupName以及messageModel等一些基礎屬性作一下校驗
                this.checkConfig();

                //若是是集羣模式,就將實例名改成PID,由於通常沒設置instanceName的狀況下,默認是Default,
                //可是集羣下,不一樣的consumer節點須要區分開來,不能全設置爲Default
                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultLitePullConsumer.changeInstanceNameToPID();
                }

                //初始化客戶端實例,註冊爲消費者
                initMQClientFactory();

                //初始化消息的重平衡策略的配置,
                initRebalanceImpl();

                initPullAPIWrapper();

                //根據消息的消費模式,啓動不一樣的offsetStore對象
                //集羣模式就啓動集羣模式的
                //廣播模式啓動廣播模式的
                initOffsetStore();

                mQClientFactory.start();

                //週期性獲取最新的Topic和與之對應的MessageQueue
                startScheduleTask();

                this.serviceState = ServiceState.RUNNING;

                log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());

                //根據拉取消息的策略來執行,有兩種,一種是訂閱Topic,一種是本身分配messageQueue,也就是assign
                //若是是assign,須要在start consumer以前,本身先根據topic拿到全部的messageQueue的信息
                //而後assign本身想要選擇的messageQueue
                //若是是subscribe,就會自動根據傳入的topic去namesrv拉取全部messageQueue的數據
                operateAfterRunning();

                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    }

第一遍看源碼的時候,就這麼大體過了一遍,而後接下來就是主動拉取消息的時候了:對象

List<MessageExt> messageExts = litePullConsumer.poll();

在最上面啓動消費者的代碼中有這麼一行,是主動拉取消息的方法,在之前的DefaultMQPullConsumer中,是能夠設置拉取多少條消息的,可是如今這個類不能設置了,我點進poll方法開始追蹤消息的拉取策略隊列

public synchronized List<MessageExt> poll(long timeout) {
        try {
            checkServiceState();
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            //看是否設置了自動提交offset,若是是就根據當前時間判斷一下當前是否須要提交一次offset
            if (defaultLitePullConsumer.isAutoCommit()) {
                maybeAutoCommit();
            }
            long endTime = System.currentTimeMillis() + timeout;

            //獲取拉取到的消息
            ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

            if (endTime - System.currentTimeMillis() > 0) {
                while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
                    consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                    if (endTime - System.currentTimeMillis() <= 0)
                        break;
                }
            }

            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                List<MessageExt> messages = consumeRequest.getMessageExts();
                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
                assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                //If namespace not null , reset Topic without namespace.
                this.resetTopic(messages);
                return messages;
            }
        } catch (InterruptedException ignore) {

        }

        return Collections.emptyList();
    }

看到這裏我就發現不對勁了,ConsumeRequest是一個保存了消息數據的一個類,這個類竟然是從consumeRequestCache這麼一個BlockingQueue中取出來的

private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();

等等,不對勁啊,怎麼直接就從程序的隊列中取出來了,這啥時候放進去的啊,我都還沒開始拉取,怎麼就有消息了呢,因而我就追蹤了一下這個consumeRequestCache,發現他裏面的數據是經過PullTaskImpl這個線程類在啓動的時候放進去的,而後就發現了一個存放PullTaskImpl的隊列

private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();

最終追蹤到了一個startPullTask方法,在這個方法須要傳入一個集合的messageQueue,而後建立一個PullTaskImpl開始拉取消息,生成ConsumeRequest. 繼續追蹤發現有兩個地方使用到了這個startPullTask方法,一個是updateAssignPullTask,一個是updatePullTask,而後我一路追蹤這兩個方法的用途. 發現了updateAssignPullTask是在上面start方法中的operateAfterRunning()使用的,這下對應上了,在assign模式下,傳入了assign的messageQueues而後在start以後從這些queue拉取消息.

另外一個方法是在觸發消息分發重平衡的時候執行的.在RebalanceLitePullImpl中

這下主動拉取消息的邏輯明白了,原來是在consumer啓動的時候,若是指定了messageQueue,就開始從這些queue中拉取消息,若是沒有指定,那就在重平衡的時候從訂閱的Topic里拉取全部的messageQueue,而後再拉取消息,以後就經過poll方法來取.

對了還有很是重要的一點,主動拉取消息的狀況,是默認自動提交消費位點offset的,能夠經過setAutoCommit(false);來設置爲手動提交.消費完了記得調用commitSync();

相關文章
相關標籤/搜索