rocketmq之源碼分析consumer源碼解析註釋(十一)

consumer的常規訂閱消息的整體操做流程:api

構造初始化-》註冊監聽-》啓動-》無限循環請求隊列-》長鏈接的數據拉取安全

一,構造初始化
    DefaultMQPushConsumer:常規的訂閱消息,須要制定惟一的分組名稱
        最終構造的對象是DefaultMQPushConsumerImpl:
            核心的功能實現,整合內部多個組件
    配置消息的消息開始位置:ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
    配置namesrv地址:拉取對應的配置內容和關係結構
    訂閱內容:基於特定topic的訂閱,而後內置了表達式引擎(過濾內容)
    註冊監聽:該監聽是監聽到有消息時的自有業務邏輯處理網絡

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wang-group-consmuer");

/*
 * Specify where to start in case the specified consumer group is a brand new one.
 */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//指定nameserver的地址,全部的數據交互都是基於nameserve來進行的信息獲取和更新及心跳
consumer.setNamesrvAddr("127.0.0.1:9876");

/*
 * Subscribe one more more topics to consume.
 */
consumer.subscribe("TopicTest", "*");

/*
 *  Register callback to execute on arrival of messages fetched from brokers.
 */
consumer.registerMessageListener(new MessageListenerConcurrently() {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

/*
 *  Launch the consumer instance.
 */
consumer.start();

構造函數的初始化數據結構

public DefaultMQPushConsumer(final String consumerGroup) {
    this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
    AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
    this.consumerGroup = consumerGroup;
    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

實際實現服務的核心屬性併發

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    /**
     * Delay some time when exception occur
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
    /**
     * Flow control interval
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
    /**
     * Delay some time when suspend pull service
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
    private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
    private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
    private final InternalLogger log = ClientLogger.getLog();
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    //負載的選擇器
    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
    private final long consumerStartTimestamp = System.currentTimeMillis();
    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
    private final RPCHook rpcHook;
    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;

    //客戶端實例,複用
    private MQClientInstance mQClientFactory;
    //拉取服務請求的包裝
    private PullAPIWrapper pullAPIWrapper;
    private volatile boolean pause = false;
    private boolean consumeOrderly = false;
    //消息監聽
    private MessageListener messageListenerInner;
    //消息的偏移位置
    private OffsetStore offsetStore;
    //客戶端服務
    private ConsumeMessageService consumeMessageService;
    private long queueFlowControlTimes = 0;
    private long queueMaxSpanFlowControlTimes = 0;

二,app

啓動
    start:最終的核心啓動是DefaultMQPushConsumerImpl的啓動
        啓動和消息發送端的啓動相似,進行對應的初始化及啓動操做
            配置服務的執行狀態,內部拆分了四個狀態機制
            驗證關鍵配置,主要是影響到和新功能的配置的內容
            將配置的訂閱信息拷貝到對應的監聽,對應的負載,消費等數據對象中
            生成實例id
            得到MQClient的實例工程,和producer一致,關鍵的核心操做實現,綜合體
            將核心的配置注入到負載服務中
            包裝拉取服務,包裝成獨立的綜合體
            得到消息的開始消費偏移位置,用於消息拉取的請求參數
            根據消息的監聽類型,設置,分爲兩類,順序監聽和併發監聽,併發監聽的效率更高
            註冊消費處理
            實例MQClient的工程方法啓動
            標示爲當前服務爲運行狀態負載均衡

            更新訂閱的topic信息
            檢查使用的broker信息
            發送心跳內容給全部的broker
            立刻調用負載服務平衡框架

public void start() throws MQClientException {
    //消費端的核心啓動入口
    this.defaultMQPushConsumerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

核心功能的操做啓動ide

//消費端主動推送的啓動核心入口
public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;

            //驗證核心關鍵配置
            this.checkConfig();

            //將配置中的訂閱信息拷貝到推送服務中
            this.copySubscription();

            //生成id
            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }

            //得到客戶端工廠,共享的客戶端工程單例設計
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

            //將配置中的核心配置賦值給負載中
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            //拉取消息的包裝api
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            //設置消息的開始位置
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            //加載消息讀取位置信息
            this.offsetStore.load();

            //消息接受器的監聽類型
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            //消費者監聽服務啓動
            this.consumeMessageService.start();

            //註冊服務處理事件
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown();
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            //連接工程的啓動
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    //更新topic信息到當前的內存數據結構中,便於後期的直接使用
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    //檢查須要連接的broker是否可用
    this.mQClientFactory.checkClientInBroker();
    //發送心跳數據給當前服務所連接的全部broker中,操做過程是基於安全鎖機制
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    //喚醒負載平衡操做
    this.mQClientFactory.rebalanceImmediately();
}

三,監聽
    業務根據RocketMQ的規範,根據業務特定實現的接口
    接口分爲兩類,順序消息,並行消費函數

//消息接受器的監聽類型
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
            //有序消息
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
            //並行消息
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

並行消息

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
    private static final InternalLogger log = ClientLogger.getLog();

    //消息的實際拉取操做
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    //消費消息的頂層結構
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    //並行消息接聽
    private final MessageListenerConcurrently messageListener;
    private final BlockingQueue<Runnable> consumeRequestQueue;
    //線程池執行配置
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;

    //任務調用
    private final ScheduledExecutorService scheduledExecutorService;
    //任務調度清除超時
    private final ScheduledExecutorService cleanExpireMsgExecutors;

    public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;

        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));

        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }

    public void start() {
        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                //清除超時消息
                cleanExpireMsg();
            }

        }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    }

四,長鏈接的拉取
    啓動入口在MQClient的連接工程的啓動中
        在啓動中指定負載服務的啓動中RebalanceService
            在操做中this.waitForRunning(waitInterval);默認是阻塞20000,可是在服務啓動後,執行了this.mQClientFactory.rebalanceImmediately();
            核心操做在重置負載中this.mqClientFactory.doRebalance();
            有一個前提,根據當前服務的模式,分爲push自動領取,pull程序主動發起,還有一個條件就是是不是順序消息
            遍歷訂閱信息,從新負載topic信息
            最核心的操做在更新queue到負載中,完成了初始化
            此時調用的是PullMessageService的執行拉取操做,該對象是核心
            後期的操做就是自動化請求,基於隊列的形式,持續的請求,完成後再放置下一次請求到隊列中,循環請求

//連接工程的啓動
mQClientFactory.start();
//初始化客戶端請求實例
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    //mq的核心配置信息
    this.clientConfig = clientConfig;
    //當前進程內的惟一標識,升序數值
    this.instanceIndex = instanceIndex;
    //netty通訊的客戶端配置
    this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());

    //解析客戶端請求,封裝的事件處理
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);

    //客戶端實例的實際實現,網絡通訊的核心,只是初始化了通訊框架,具體的連接後面根據不一樣的地址再進行連接操做
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

    //設置核心的nameserv地址
    if (this.clientConfig.getNamesrvAddr() != null) {
        this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
    }

    this.clientId = clientId;

    //mq管理
    this.mQAdminImpl = new MQAdminImpl(this);

    //拉取消息的實現
    this.pullMessageService = new PullMessageService(this);

    //負載均衡的實現,可能有相關的機器增長刪除,須要按期的進行重負載操做
    this.rebalanceService = new RebalanceService(this);

    //消息發送者的包裝,發送者的發送者,這個邏輯有點亂,而且在構造方法中從新初始化的
    //producer -> DefaultMQProducer -> DefaultMQProducerImpl -> MQClientInstance -> DefaultMQProducer
    this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    this.defaultMQProducer.resetClientConfig(clientConfig);

    //消費客戶端的狀態管理
    this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

    log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
        this.instanceIndex,
        this.clientId,
        this.clientConfig,
        MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
//啓動發送消息的核心,同時也是訂閱消息的核心
public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                //啓動netty的客戶端配置
                // Start request-response channel
                this.mQClientAPIImpl.start();

                //啓動任務,更新,驗證,心跳等操做
                // Start various schedule tasks
                this.startScheduledTask();

                //消費端
                // Start pull service
                this.pullMessageService.start();

                //消費端,從新負載設置,請求的初始化操做也在此方法內執行
                // Start rebalance service
                this.rebalanceService.start();

                //前面已經初始化過操做,該入參爲false,只須要初始化其餘操做
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

初始化操做類RebalanceService

public class RebalanceService extends ServiceThread {
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));
    private final InternalLogger log = ClientLogger.getLog();
    //消息的網絡操做及功能
    private final MQClientInstance mqClientFactory;

    //負載服務的初始化,構造是鏈接服務的實例,當前是線程的子類
    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            //等待執行
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

    @Override
    public String getServiceName() {
        return RebalanceService.class.getSimpleName();
    }
}

表現的狀態是在waiting,而後再指定負載,執行的負載操做是

//負載平衡重置
public void doRebalance() {
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}

此時consumerTable中的數據在前面的初始化啓動中進行了註冊操做

//註冊服務處理事件
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown();
    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
        null);
}

執行平臺操做

@Override
public void doRebalance() {
    if (!this.pause) {
        this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
    }
}
public void doRebalance(final boolean isOrder) {
    //得到訂閱的內容
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                //操做對topic的負載
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }

    //基於數據的統一處理
    this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case BROADCASTING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged {} {} {} {}",
                        consumerGroup,
                        topic,
                        mqSet,
                        mqSet);
                }
            } else {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
            break;
        }
        //默認的集羣模式
        case CLUSTERING: {
            //得到當前topic的訂閱隊列信息
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            //請求得到topic的cid請求
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (null == mqSet) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
            }

            if (null == cidAll) {
                log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
            }

            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);

                //排序操做
                Collections.sort(mqAll);
                Collections.sort(cidAll);

                //選擇cid的策略
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try {
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                        e);
                    return;
                }

                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }

                //是否有變化
                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                if (changed) {
                    log.info(
                        "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                        strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                        allocateResultSet.size(), allocateResultSet);
                    //消息隊列的修改
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
            }
            break;
        }
        default:
            break;
    }
}

主要的操做在修改處理消息隊列的變化boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
    final boolean isOrder) {
    boolean changed = false;

    //得到處理隊列的數據
    Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
    //遍歷隊列同時對消息對鞋進行修正
    while (it.hasNext()) {
        Entry<MessageQueue, ProcessQueue> next = it.next();
        MessageQueue mq = next.getKey();
        ProcessQueue pq = next.getValue();

        //針對相同的topic的處理
        if (mq.getTopic().equals(topic)) {
            if (!mqSet.contains(mq)) {
                pq.setDropped(true);
                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                    it.remove();
                    changed = true;
                    log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                }
            } else if (pq.isPullExpired()) {
                switch (this.consumeType()) {
                    case CONSUME_ACTIVELY:
                        break;
                    case CONSUME_PASSIVELY:
                        pq.setDropped(true);
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                consumerGroup, mq);
                        }
                        break;
                    default:
                        break;
                }
            }
        }
    }

    //構造拉取消息的請求
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }

            this.removeDirtyOffset(mq);
            ProcessQueue pq = new ProcessQueue();
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    //封裝拉取的請求結構
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }

    //處理消息拉取請求
    this.dispatchPullRequest(pullRequestList);

    return changed;
}

拉取的操做this.dispatchPullRequest(pullRequestList);是抽象的設計,須要根據當前的實現類進行實現

@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
    for (PullRequest pullRequest : pullRequestList) {
        //實行消息的拉取操做
        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
    }
}

逐層調用實現

public void executePullRequestImmediately(final PullRequest pullRequest) {
    this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
//將請求直接添加到隊列中,這裏確定是第一次初始化和後來無限次操做的入口
public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}

將最終的拉取請求添加的請求隊列中,等待請求隊列的掃描和執行

 最終的執行開始及操做,在啓動的最後一步執行this.mQClientFactory.rebalanceImmediately();

//喚醒負載
public void rebalanceImmediately() {
    this.rebalanceService.wakeup();
}
public void wakeup() {
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // notify
    }
}

該喚醒操做和簽名的waiting操做是一一對應的操做,待服務徹底啓動後,執行拉取的喚醒操做

相關文章
相關標籤/搜索