Rocket源碼分析之-DefaultPushConsumer 拉取源碼分析

這是一段RocketMq經典的consumer異步獲取broker消息的代碼:java


    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");    consumer.setNamesrvAddr(Constants.NameServerAddr);    consumer.subscribe("topic01","*");    consumer.setMessageModel(MessageModel.BROADCASTING);//廣播消息,全部相同組,定於topic的消費端都能收到消息    //consumer.setMessageModel(MessageModel.CLUSTERING);//集羣消息--默認(相同組內的topic,集羣消息只有一端會接收到)    consumer.registerMessageListener(new MessageListenerConcurrently(){      @Override      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,          ConsumeConcurrentlyContext consumeConcurrentlyContext) {        for (MessageExt messageExt:list){          System.out.println(new java.lang.String(messageExt.getBody()));        }        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;      }    });    consumer.start();  }


consumer start()方法跟蹤app

  1. this.defaultMQPushConsumerImpl.start();異步

  2. 剛啓動serviceState狀態爲 CREATE_JUST,進入這個狀態的switch處理邏輯ide

3. 先用checkCoing()檢查consumer的各個配置是否配置ok
this

4. 而後 copySubscription()用於根據subject構建本地的rebalance的conhurrentHashMapInnerspa

5. 接着構建MqClientFactory的一個Instance3d

6. 構建PullWrapper,用於去Broker註冊過濾消息
orm

7. 再根據MessageMode是廣播模式仍是集羣模式獲取offset。(廣播模式是從consumer本地的store獲取,集羣模式則是須要去broker去請求獲取)對象

8. 根據監聽消息的類型是OrderLy仍是Concurrently去構建一個consumeMessageService對象
blog

9.啓動剛纔建立的consumerMessageService對象,調用其start方法

10. 使用MqClientFactory Instance實例registerConsumer進行註冊

11. 把當前的serviceState狀態變爲Running狀態

12.而後就開始從broker獲取消息,請看下面的pushConsumer拉取消息流程


pushConsumer拉取消息流程介紹 

consumer  --DefaultMqPushConsumerImpl 使用pullMessage(pullRequest)拉取消息,pullAPIWrapper.pullKernelImpl(傳遞pullReuest,回調callback等參數)根據是否同步pullMessageSync仍是異步pullMessageAsync, 拉取回來的消息PullResult通過解析處理存放到ProcessQueue 隊列裏的TreeMap(offset,messageExt)


代碼流程圖 


圖片

相關文章
相關標籤/搜索