這是一段RocketMq經典的consumer異步獲取broker消息的代碼:java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); consumer.setNamesrvAddr(Constants.NameServerAddr); consumer.subscribe("topic01","*"); consumer.setMessageModel(MessageModel.BROADCASTING);//廣播消息,全部相同組,定於topic的消費端都能收到消息 //consumer.setMessageModel(MessageModel.CLUSTERING);//集羣消息--默認(相同組內的topic,集羣消息只有一端會接收到) consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt:list){ System.out.println(new java.lang.String(messageExt.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
consumer start()方法跟蹤app
1. this.defaultMQPushConsumerImpl.start();異步
2. 剛啓動serviceState狀態爲 CREATE_JUST,進入這個狀態的switch處理邏輯ide
3. 先用checkCoing()檢查consumer的各個配置是否配置ok
this
4. 而後 copySubscription()用於根據subject構建本地的rebalance的conhurrentHashMapInnerspa
5. 接着構建MqClientFactory的一個Instance3d
6. 構建PullWrapper,用於去Broker註冊過濾消息
orm
7. 再根據MessageMode是廣播模式仍是集羣模式獲取offset。(廣播模式是從consumer本地的store獲取,集羣模式則是須要去broker去請求獲取)對象
8. 根據監聽消息的類型是OrderLy仍是Concurrently去構建一個consumeMessageService對象
blog
9.啓動剛纔建立的consumerMessageService對象,調用其start方法
10. 使用MqClientFactory Instance實例registerConsumer進行註冊
11. 把當前的serviceState狀態變爲Running狀態
12.而後就開始從broker獲取消息,請看下面的pushConsumer拉取消息流程
pushConsumer拉取消息流程介紹
consumer --DefaultMqPushConsumerImpl 使用pullMessage(pullRequest)拉取消息,pullAPIWrapper.pullKernelImpl(傳遞pullReuest,回調callback等參數)根據是否同步pullMessageSync仍是異步pullMessageAsync, 拉取回來的消息PullResult通過解析處理存放到ProcessQueue 隊列裏的TreeMap(offset,messageExt)
代碼流程圖