消息隊列通常在消費端都會提供push和pull兩種模式,RocketMQ一樣實現了這兩種模式,分別提供了兩個實現類:DefaultMQPushConsumer和DefaultMQPullConsumer;兩種方式各有優點:
push模式:推送模式,即服務端有數據以後立馬推送消息給客戶端,須要客戶端和服務器創建長鏈接,實時性很高,對客戶端來講也簡單,接收處理消息便可;缺點就是服務端不知道客戶端處理消息的能力,可能會致使數據積壓,同時也增長了服務端的工做量,影響服務端的性能;
pull模式:拉取模式,即客戶端主動去服務端拉取數據,主動權在客戶端,拉取數據,而後處理數據,再拉取數據,一直循環下去,具體拉取數據的時間間隔很差設定,過短可能會致使大量的鏈接拉取不到數據,太長致使數據接收不及時;
RocketMQ使用了長輪詢的方式,兼顧了push和pull兩種模式的優勢,下面首先對長輪詢作簡單介紹,進而分析RocketMQ內置的長輪詢模式。git
長輪詢經過客戶端和服務端的配合,達到主動權在客戶端,同時也能保證數據的實時性;長輪詢本質上也是輪詢,只不過對普通的輪詢作了優化處理,服務端在沒有數據的時候並非立刻返回數據,會hold住請求,等待服務端有數據,或者一直沒有數據超時處理,而後一直循環下去;下面看一下如何簡單實現一個長輪詢;github
客戶端應該存在一個一直循環的程序,不停的向服務端發送獲取消息請求;express
服務器接收到客戶端請求以後,首先查看是否有數據,若是有數據則直接返回,若是沒有則保持鏈接,等待獲取數據,服務端獲取數據以後,會通知以前的請求鏈接來獲取數據,而後返回給客戶端;服務器
正常狀況下,客戶端會立刻接收到服務端的數據,或者等待一段時間獲取到數據;若是一直獲取不到數據,會有超時處理;在獲取數據或者超時處理以後會關閉鏈接,而後再次發起長輪詢請求;app
如下使用netty模擬一個http服務器,使用HttpURLConnection模擬客戶端發送請求,使用BlockingQueue存放數據;dom
服務端代碼ide
public class Server { public static void start(final int port) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup woker = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.channel(NioServerSocketChannel.class).group(boss, woker) .childOption(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-decoder", new HttpServerCodec()); ch.pipeline().addLast(new HttpServerHandler()); } }); ChannelFuture future = serverBootstrap.bind(port).sync(); System.out.println("server start ok port is " + port); DataCenter.start(); future.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); woker.shutdownGracefully(); } } public static void main(String[] args) throws Exception { start(8080); } }
netty默認支持http協議,直接使用便可,啓動端口爲8080;同時啓動數據中心服務,相關代碼以下:oop
public class DataCenter { private static Random random = new Random(); private static BlockingQueue<String> queue = new LinkedBlockingQueue<>(); private static AtomicInteger num = new AtomicInteger(); public static void start() { while (true) { try { Thread.sleep(random.nextInt(5) * 1000); String data = "hello world" + num.incrementAndGet(); queue.put(data); System.out.println("store data:" + data); } catch (InterruptedException e) { e.printStackTrace(); } } } public static String getData() throws InterruptedException { return queue.take(); } }
爲了模擬服務端沒有數據,須要等待的狀況,這裏使用BlockingQueue來模擬,不按期的往隊列裏面插入數據,同時對外提供獲取數據的方法,使用的是take方法,沒有數據會阻塞知道有數據爲止;getData在類HttpServerHandler中使用,此類也很簡單,以下:性能
public class HttpServerHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpRequest) { FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); httpResponse.content().writeBytes(DataCenter.getData().getBytes()); httpResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); httpResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, httpResponse.content().readableBytes()); ctx.writeAndFlush(httpResponse); } } }
獲取到客戶端的請求以後,從數據中心獲取一條消息,若是沒有數據,會進行等待,直到有數據爲止;而後使用FullHttpResponse返回給客戶端;客戶端使用HttpURLConnection來和服務端創建鏈接,不停的拉取數據,代碼以下:優化
public class Client { public static void main(String[] args) { while (true) { HttpURLConnection connection = null; try { URL url = new URL("http://localhost:8080"); connection = (HttpURLConnection) url.openConnection(); connection.setReadTimeout(10000); connection.setConnectTimeout(3000); connection.setRequestMethod("GET"); connection.connect(); if (200 == connection.getResponseCode()) { BufferedReader reader = null; try { reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuffer result = new StringBuffer(); String line = null; while ((line = reader.readLine()) != null) { result.append(line); } System.out.println("時間:" + new Date().toString() + "result = " + result); } finally { if (reader != null) { reader.close(); } } } } catch (IOException e) { e.printStackTrace(); } finally { if (connection != null) { connection.disconnect(); } } } } }
以上只是簡單的模擬了長輪詢的方式,下面重點來看看RocketMQ是如何實現長輪詢的;
RocketMQ的消費端提供了兩種消費模式分別是:DefaultMQPushConsumer和DefaultMQPullConsumer,其中DefaultMQPushConsumer就是使用的長輪詢,因此下面重點分析此類;
從名字能夠看出來就是客戶端從服務端拉取數據的服務,看裏面的一個核心方法:
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
服務啓動以後,會一直不停的循環調用拉取數據,PullRequest能夠看做是拉取數據須要的參數,部分代碼以下:
public class PullRequest { private String consumerGroup; private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; private boolean lockedFirst = false; ...省略... }
每一個MessageQueue 對應了封裝成了一個PullRequest,由於拉取數據是以每一個Broker下面的Queue爲單位,同時裏面還一個ProcessQueue,每一個MessageQueue也一樣對應一個ProcessQueue,保存了這個MessageQueue消息處理狀態的快照;還有nextOffset用來標識讀取的位置;繼續看一段pullMessage中的內容,給服務端發送請求的頭內容:
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult;
其中有一個參數是SuspendTimeoutMillis,做用是設置Broker的最長阻塞時間,默認爲15秒,前提是沒有消息的狀況下,有消息會馬上返回;
從名字能夠看出,服務端用來處理pullMessage的服務,下面重點看一下processRequest方法,其中包括對獲取不一樣結果作的處理:
switch (response.getCode()) { case ResponseCode.SUCCESS: ...省略... break; case ResponseCode.PULL_NOT_FOUND: if (brokerAllowSuspend && hasSuspendFlag) { long pollingTimeMills = suspendTimeoutMillisLong; if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; } case ResponseCode.PULL_RETRY_IMMEDIATELY: break; case ResponseCode.PULL_OFFSET_MOVED: ...省略... break; default: assert false;
一共處理了四個類型,咱們關心的是在沒有獲取到數據的狀況下是如何處理的,能夠重點看一下ResponseCode.PULL_NOT_FOUND,表示沒有拉取到數據,此時會調用PullRequestHoldService服務,從名字能夠看出此服務用來hold住請求,不會立馬返回,response被至爲了null,不給客戶端響應;下面重點看一下PullRequestHoldService:
@Override public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
此方法主要就是經過不停的檢查被hold住的請求,檢查是否已經有數據了,具體檢查哪些就是在ResponseCode.PULL_NOT_FOUND中調用的suspendPullRequest方法:
private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<String, ManyPullRequest>(1024); public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (null == mpr) { mpr = new ManyPullRequest(); ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); if (prev != null) { mpr = prev; } } mpr.addPullRequest(pullRequest); }
將須要hold處理的PullRequest放入到一個ConcurrentHashMap中,等待被檢查;具體的檢查代碼在checkHoldRequest中:
private void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); try { this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); } } } }
此方法用來獲取指定messageQueue下最大的offset,而後用來和當前的offset來比較,來肯定是否有新的消息到來;往下看notifyMessageArriving方法:
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { List<PullRequest> requestList = mpr.cloneListAndClear(); if (requestList != null) { List<PullRequest> replayList = new ArrayList<PullRequest>(); for (PullRequest request : requestList) { long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } } if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } replayList.add(request); } if (!replayList.isEmpty()) { mpr.addPullRequest(replayList); } } } }
方法中兩個重要的斷定就是:比較當前的offset和maxoffset,看是否有新的消息到來,有新的消息返回客戶端;另一個就是比較當前的時間和阻塞的時間,看是否超過了最大的阻塞時間,超過也一樣返回;
此方法不光在PullRequestHoldService服務類中循環調用檢查,同時在DefaultMessageStore中消息被存儲的時候調用;其實就是主動檢查和被動通知兩種方式。
服務端處理完以後,給客戶端響應,回調其中的PullCallback,其中在處理完消息以後,重要的一步就是再次把pullRequest放到PullMessageService服務中,等待下一次的輪詢;
本文首先介紹了兩種消費消息的模式,介紹了其中的優缺點,而後引出了長輪詢,而且在本地簡單模擬了長輪詢,最後重點介紹了RocketMQ中是如何實現的長輪詢。