Rocketmq源碼解讀之消息拉取

最近閱讀了Rocketmq關於pullmessage的實現方式,分享出來緩存

 

衆所周知,Rocketmq在consumer端是拉取消息的方式,它會在客戶端維護一個PullRequestQueue,這個是一個阻塞隊列(LinkedBlockingQueue),內部的節點是PullRequest,每個PullRequest表明了一個消費的分組單元框架

 

PullRequest會記錄一個topic對應的consumerGroup的拉取進度,包括MessageQueue和PorcessQueue,還有拉取的offset異步

(代碼片斷一)
public
class PullRequest { private String consumerGroup; private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; private boolean lockedFirst = false; }

其中MessageQueue記錄元信息:ide

(代碼片斷二)
public
class MessageQueue implements Comparable<MessageQueue>, Serializable { private String topic; private String brokerName; private int queueId; }

PorcessQueue記錄一次拉取以後實際消息體和拉取相關操做記錄的快照ui

(代碼片斷三)
public
class ProcessQueue { private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); private final AtomicLong msgCount = new AtomicLong(); private final AtomicLong msgSize = new AtomicLong(); private final Lock lockConsume = new ReentrantLock(); private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>(); private final AtomicLong tryUnlockTimes = new AtomicLong(0); private volatile long queueOffsetMax = 0L; private volatile boolean dropped = false; private volatile long lastPullTimestamp = System.currentTimeMillis(); private volatile long lastConsumeTimestamp = System.currentTimeMillis(); private volatile boolean locked = false; private volatile long lastLockTimestamp = System.currentTimeMillis(); private volatile boolean consuming = false; private volatile long msgAccCnt = 0; }

 

 

PullMessageService負責輪詢PullRequestQueue,並進行消息元的拉取this

(代碼片斷四)
public
class PullMessageService extends ServiceThread { private final InternalLogger log = ClientLogger.getLog(); private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>(); private final MQClientInstance mQClientFactory; @Override public void run() { while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } } }

 

在發送的時候會維護一個PullCallback,這是拉取收到消息後的回調處理spa

(代碼片斷五)
public
interface PullCallback { void onSuccess(final PullResult pullResult); void onException(final Throwable e); }

這裏的實現邏輯就不貼了,本質上就是把消息丟給消費線程池來處理線程

 

pullMessage分爲同步拉取和異步拉取兩種模式,先解讀異步拉取,而後再解讀同步拉取,再說明二者的區別netty

其實從這裏已經大概能夠看出來,異步的方式,這個方法返回值是null,同步的方式必需要返回PullResult,後續說明區別code

(代碼片斷六)
public
PullResult pullMessage( final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); switch (communicationMode) { case ONEWAY: assert false; return null; case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); return null; case SYNC: return this.pullMessageSync(addr, request, timeoutMillis); default: assert false; break; } return null; }

 

先介紹異步拉取

能夠看到,把PullCallback傳進去,並封裝了InvokeCallback,

(代碼片斷七)
private
void pullMessageAsync( final String addr, final RemotingCommand request, final long timeoutMillis, final PullCallback pullCallback ) throws RemotingException, InterruptedException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response); assert pullResult != null; pullCallback.onSuccess(pullResult); } catch (Exception e) { pullCallback.onException(e); } } else { if (!responseFuture.isSendRequestOK()) { pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); } else if (responseFuture.isTimeout()) { pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause())); } else { pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); } } } }); }

 

接下來進入NettyRemotingAbstract這個類中,使用netty的Chanle發送

(代碼片斷八)
public
void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseFuture.putResponse(null); responseTable.remove(opaque); try { executeInvokeCallback(responseFuture); } catch (Throwable e) { log.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { responseFuture.release(); } log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }

 

這裏須要詳細解讀一下:

RemotingCommand是request和response的載體,先從request中取出opaque,這是一個自增的操做id,而後將傳進來的opaque和invokeCallback封裝成一個ResponseFuture,再put到一個叫

responseTable的map中,這個map是一個核心的map,維護着opaque和對應的ResponseFuture

    /**
     * This map caches all on-going requests.
     */
    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);

從註釋中能夠看出,它緩存着正在執行的request

 

再回到剛剛的(代碼片斷八)中,channel.writeAndFlush(request).addListener(new ChannelFutureListener(){...}),netty在writeAndFlush發送完以後會回調咱們ChannelFutureListener的operationComplete方法:若是發送成功則responseFuture.setSendRequestOK(true); 而且就return了;若是發送失敗,則從responseTable中移除,而且起一個異步線程執行responseFuture中的InvokeCallback,在(代碼片斷七)中,能夠看到當responseFuture.isSendRequestOK()是false的時候,執行了onException,這裏就很少介紹了。

 

那麼此時發送的邏輯就所有結束了,整個過程沒有任何的阻塞,當Broker收到拉取請求後,會按照queueOffset等信息封裝好返回consumer端,

會通過NettyRemotingServer上註冊的NettyServerHandler

(代碼片斷九)
class
NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processTunnelId(ctx, msg); processMessageReceived(ctx, msg); } public void processTunnelId(ChannelHandlerContext ctx, RemotingCommand msg) { if (nettyServerConfig.isValidateTunnelIdFromVtoaEnable()) { if (null != msg && msg.getType() == RemotingCommandType.REQUEST_COMMAND) { Vtoa vtoa = tunnelTable.get(ctx.channel()); if (null == vtoa) { vtoa = VpcTunnelUtils.getInstance().getTunnelID(ctx); tunnelTable.put(ctx.channel(), vtoa); } msg.addExtField(VpcTunnelUtils.PROPERTY_VTOA_TUNNEL_ID, String.valueOf(vtoa.getVid())); } } } }

 

最終會調用到NettyRemotingAbstract的processResponseCommand,RemotingCommand中根據opaque從responseTable中獲取ResponseFuture,而後一樣也是執行callback,這樣,就實現了整個pullmessage的異步模式

(代碼片斷十)
public
void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { //異步 executeInvokeCallback(responseFuture); //執行回調 } else { //同步 responseFuture.putResponse(cmd); //爲了解除阻塞 responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }

 

咱們再看下同步的方式是如何實現的

 

回顧下代碼片斷六,同步的方式是須要返回PullResult的,換句話說,這種方式是須要在發送的線程中來處理返回結果的

咱們從代碼片斷六跟下去,跟到NettyRemotingAbstract的invokeSyncImpl

(代碼片斷十一)
public
RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }

 

和異步發送的代碼片斷八對比一下,能夠看到,同步方式也要放到responseTable中,這裏就有個疑惑了,既然都同步了,還要放到responseTable中幹什麼呢,繼續往下看,

ChannelFutureListener都是同樣的,若是發送成功就返回了,而後到了最關鍵的一行:

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

這一行顧名思義就是阻塞了,可是也不能一直阻塞住(由於PullMessageService是單線程的,若是由於一個異常就阻塞那就跪了),因此是一個設置了超時時間的阻塞,看下是如何阻塞的

 

ResponseFuture中有這兩個方法,當putResponse的時候,把RemotingCommand賦值,而且countDownLatch.countDown,而在waitResponse的時候countDownLatch.await

(代碼片斷十二)
public
RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; } public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown(); }

 

這樣就清晰多了,剩下的疑問就是,在何時putResponse的,有兩個地方:

第一個地方,當拉取消息回來的時候,回顧下代碼片斷十,有一句是(responseFuture.getInvokeCallback() != null),經過剛剛的流程已經知道,只有異步的時候invokeCallback纔不爲null,所以走到else,看到在這個時候responseFuture.putResponse(cmd)和responseFuture.release(),也就是說同步方式也是經過responseTable存儲的方式,來獲取結果,而且經過CountDownLatch來阻塞發送的線程,當收到消息以後再countDown,發送端最終返回PullResult來處理消息

第二個地方,回顧下代碼片斷十一,在ChannelFutureListener中當發送失敗了之後,也會put一個null值:responseFuture.putResponse(null),這裏只是爲了將阻塞放開

 

至此,Rockmq關於pullmessage的同步和異步方式就已經說明白了,總結一下,同步和異步本質上都是「異步」的,由於netty就是一個異步的框架,Rockmq只是利用了CountDownLatch來阻塞住發送端線程來實現了「同步」的效果,

經過一個responseTable來緩存住發送出去的請求,等收到的時候從這個緩存裏按對應關係取出來,再去作對應的consumer線程的消息處理

相關文章
相關標籤/搜索