這節介紹Consumer接收消息的流程,分爲Pull和Push模式。segmentfault
上一節講Rebalance時提到,Consumer接受客戶端有兩種方式:緩存
其中1.的通知到達Consumer後,會當即觸發Rebalance,而後會重置2.的定時器等待時間。兩者最後通知Consumer的方式爲微信
executePullRequestImmediately的內容爲:app
public void executePullRequestImmediately(final PullRequest pullRequest) { this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); }
即將PullRequest對象傳給了PullMessageService的executePullRequestImmediately方法:框架
public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
PullMessageService的結構以下:異步
內部維護着一個LinkedBlockingQueue屬性pullRequestQueue,用於存儲待處理的PullRequest;還有一個ScheduledExecutorService,用於延期處理PullRequest。具體流程以下:ide
從上面的過程能夠看出,Push模式內部仍是客戶端主動去拉取的,即所謂的封裝拉模式以實現推模式,簡單示意圖以下:fetch
內部經過PullMessageService循環的從PullRequest對應MessageQueue中主動拉取數據。this
該方法用於完成從MessageQueue拉取消息的過程,主要過程以下:spa
進行一系列的檢查,若是檢查不經過,則等待必定時間後再放回PullMessageService的待處理隊列中,主要是經過PullMessageService中的ScheduledExecutorService來作到延遲執行,涉及的狀況包括:
上面經過PullAPIWrapper收到結果後會將結果包裝爲PullResult對象並回調PullCallback。PullCallback和PullResult的定義以下:
public interface PullCallback { void onSuccess(final PullResult pullResult); void onException(final Throwable e); }
public class PullResult { private final PullStatus pullStatus;//請求狀態 private final long nextBeginOffset;//Broker返回的下一次開始消費的offset private final long minOffset; private final long maxOffset; private List<MessageExt> msgFoundList;//消息列表,一次請求返回一批消息 }
下面爲pullMessage方法處理異步返回結果的流程:
根據請求狀態進行處理
有新消息(FOUND)
沒有新消息(NO_NEW_MSG)
沒有匹配的消息(NO_MATCHED_MSG)
不合法的偏移量(OFFSET_ILLEGAL)
下面先介紹下ProcessQueue,這裏只標識幾個相關的屬性:
public class ProcessQueue { private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); //緩存的待消費消息,按照消息的起始offset排序 private final TreeMap</*消息的起始offset*/Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); //緩存的待消費消息數量 private final AtomicLong msgCount = new AtomicLong(); //緩存的待消費消息大小 private final AtomicLong msgSize = new AtomicLong(); private final Lock lockConsume = new ReentrantLock(); /** * A subset of msgTreeMap, will only be used when orderly consume */ private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>(); private final AtomicLong tryUnlockTimes = new AtomicLong(0); private volatile long queueOffsetMax = 0L; private volatile boolean dropped = false; //最近執行pull的時間 private volatile long lastPullTimestamp = System.currentTimeMillis(); //最近被客戶端消費的時間 private volatile long lastConsumeTimestamp = System.currentTimeMillis(); private volatile boolean locked = false; private volatile long lastLockTimestamp = System.currentTimeMillis(); //當前是否在消費,用於順序消費模式,對並行消費無效 private volatile boolean consuming = false; private volatile long msgAccCnt = 0; }
ProcessQueue展現了MessageQueue的消費狀況。上面提到,發起pull請求後若是有數據,會先放到ProcessQueue的緩存中,即msgTreeMap屬性,於是緩存的消息會按照消息的起始offset被排序存儲。經過ProcessQueue能夠查看MessageQueue當前的處理狀況,ProcessQueue還用於輔助實現順序消費。
異步返回的消息內容將交給ConsumeMessageService處理,ConsumeMessageService是個接口,方法定義以下:
public interface ConsumeMessageService { void start(); void shutdown(); void updateCorePoolSize(int corePoolSize); void incCorePoolSize(); void decCorePoolSize(); int getCorePoolSize(); ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName); void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume); }
經過定義可見,要求實現類提供異步處理的功能。內部提供的實現類有:
ConsumeMessageConcurrentlyService:並行消費;ConsumeMessageOrderlyService:順序消費,這裏重點看ConsumeMessageConcurrentlyService。異步請求後會將拉取的新消息列表交給submitConsumeRequest方法處理,以下:
該方法會將傳入的消息列表分裝爲一個ConsumeRequest,並提到到線程池中等待處理。若是傳入的消息列表長度超過設定值(默認爲1),則會分多個批處理。
在介紹消費具體過程以前先回顧客戶端啓動流程的Demo,接收消息的寫法以下:
public class Consumer { public static void main (String[] args) throws InterruptedException, MQClientException { // 實例化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest"); // 設置NameServer的地址 consumer.setNamesrvAddr ("localhost:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾須要消費的消息 consumer.subscribe ("TopicTest", "*"); // 註冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener (new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs); // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啓動消費者實例 consumer.start (); System.out.printf ("Consumer Started.%n"); } }
其中註冊了一個MessageListenerConcurrently,該類將用於用戶端處理消息。
回過來看ConsumeRequest,該類實現了Runnable接口,會在run方法完成主要的處理工做,主要動做爲:
用戶真正接收消息並執行處理動做的地方,須要返回ConsumeConcurrentlyStatus告知框架處理結果。這裏在方法裏最好不要作耗時長的任務,快速處理後返回給框架結果,避免消息堆積在線程池中。能夠將消息內容複製一遍後再放到線程池中進行分發處理。
該方法主要在用戶消費完數據後進行收尾動做,過程以下:
ConsumerRequest在run方法的開始處,實例化了一個ConsumeConcurrentlyContext對象,用於後續傳遞內容,該定義爲:
public class ConsumeConcurrentlyContext { private final MessageQueue messageQueue; //重試的延遲級別,-1:不重試;0:由broker控制;>0由客戶端控制 private int delayLevelWhenNextConsume = 0; //消息列表最後一個正常消費的消息索引號 private int ackIndex = Integer.MAX_VALUE; }
其中ackIndex表示最後一個正常消費的消息索引號(0從開始,0~ackIndex爲正常消費),該位置後的消息表示無法正常消費。該值由用戶端控制,能夠經過ackIndex來控制須要重發的消息。
ackIndex默認值爲Integer.MAX_VALUE,若是爲該值則認爲全部消息正常消費,不存在錯誤。上面流程中統計成功和失敗也是根據ackIndex來判斷的,對於ackIndex後的消息,若是是集羣消費模式,則會先嚐試發送回broker,由broker控制重試時機;若是重試失敗,會收集這些失敗的消息,延遲5秒後再調用一次ConsumeMessageService.submitConsumeRequest讓用戶端再次處理。最後會將處理成功的消息從ProcessQueue中移除,更新緩存,而後將q消費的偏移量記錄下來,等待後臺線程同步到broker或者本地。
綜合上面的介紹,Push模式下的處理流程大體以下:
Push模式經過PullMessageService循環從監聽的MessageQueue中以Pull模式拉取消息,並分發給用戶註冊的MesageListenerConsurrently對象處理,完了以後會自動處理消息的重試,offset更新等動做,從而模擬消息從Broker端主動推進過來。
同Push模式同樣,Pull模式的觸發也是經過Rebalance,以下:
同開頭說起的同樣,會回調DefaultMQPullConsumerImpl的MessageQueueListener有Queue發生改變。
系統提供了MQPullConsumerScheduleService,能夠定時以Pull模式拉取消息,並將結果通知MessageQueueListener,內部的實現爲:
class MessageQueueListenerImpl implements MessageQueueListener { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {//mqAll該topic下的全部q,mqDivided該實例分配到的q MessageModel messageModel = MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel(); switch (messageModel) { case BROADCASTING: MQPullConsumerScheduleService.this.putTask(topic, mqAll);//通知該topic下的監聽器,最新的全部q break; case CLUSTERING: MQPullConsumerScheduleService.this.putTask(topic, mqDivided);//通知該topic下的監聽器,該實例分配的q break; default: break; } } }
putTask會將分配到的新的MessageQueue包裝爲一個PullTaskImpl,PullTaskImpl實現了Runnable,會在後臺一直執行;而將不屬於本身處理的MessageQueue對應的PullTaskImpl停掉。PullTaskImpl會查找該MessageQueue所監聽topic對應的處理類PullTaskCallback,調用doPullTask,將具體動做讓用戶處理。
MQPullConsumerScheduleService的例子爲:
public class PullScheduleService { public static void main(String[] args) throws MQClientException { final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); scheduleService.setMessageModel(MessageModel.CLUSTERING); scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {//註冊topic的監聽器 @Override public void doPullTask(MessageQueue mq, PullTaskContext context) { MQPullConsumer consumer = context.getPullConsumer(); try { long offset = consumer.fetchConsumeOffset(mq, false); if (offset < 0) offset = 0; PullResult pullResult = consumer.pull(mq, "*", offset, 32); System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: case OFFSET_ILLEGAL: break; default: break; } consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());//上報消費的offset,消費完後要主動上報 context.setPullNextDelayTimeMillis(100);//設置下一次觸發間隔 } catch (Exception e) { e.printStackTrace(); } } }); scheduleService.start(); } }
也能夠本身手動執行pull,以下面的例子:
public class PullConsumer { private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } }
相較於Push模式,Pull模式則須要用戶本身控制消息的重試,offset更新等動做。下面附上該部分當時源碼閱讀過程作的筆記簡圖:
更多原創內容請搜索微信公衆號:啊駝(doubaotaizi)