說在前面apache
接上次promise
源碼解析微信
netty鏈接管理handlerapp
class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress); final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress); log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote); // 建立netty鏈接 super.connect(ctx, remoteAddress, localAddress, promise); if (NettyRemotingClient.this.channelEventListener != null) { // 發佈netty事件,阻塞隊列實現同步=》 NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel())); } } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress); // 關閉鏈接=》 closeChannel(ctx.channel()); super.disconnect(ctx, promise); if (NettyRemotingClient.this.channelEventListener != null) { // 發佈關閉鏈接事件=》 NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress); // =》 closeChannel(ctx.channel()); super.close(ctx, promise); // 快速失敗=》 NettyRemotingClient.this.failFast(ctx.channel()); if (NettyRemotingClient.this.channelEventListener != null) { // 發佈鏈接關閉事件 NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress); // 超過空閒時間,關閉channel=》 closeChannel(ctx.channel()); if (NettyRemotingClient.this.channelEventListener != null) { // 發佈channel空閒超時事件=》 NettyRemotingClient.this .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel())); } } } ctx.fireUserEventTriggered(evt); }
發佈事件這裏用阻塞隊列實現異步,進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.NettyEventExecutor#putNettyEvent異步
public void putNettyEvent(final NettyEvent event) { if (this.eventQueue.size() <= maxSize) { this.eventQueue.add(event); } else { log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString()); } }
往上返回進入這個方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#closeChannel(io.netty.channel.Channel) 關閉鏈接ide
public void closeChannel(final Channel channel) { if (null == channel) return; try { if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { boolean removeItemFromTable = true; ChannelWrapper prevCW = null; String addrRemote = null; // 遍歷channel列表 for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) { String key = entry.getKey(); ChannelWrapper prev = entry.getValue(); if (prev.getChannel() != null) { if (prev.getChannel() == channel) { prevCW = prev; addrRemote = key; break; } } } if (null == prevCW) { log.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote); removeItemFromTable = false; } if (removeItemFromTable) { // 刪除channel this.channelTables.remove(addrRemote); log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote); RemotingUtil.closeChannel(channel); } } catch (Exception e) { log.error("closeChannel: close the channel exception", e); } finally { this.lockChannelTables.unlock(); } } else { log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } } catch (InterruptedException e) { log.error("closeChannel exception", e); } }
往上返回進入這個方法,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#failFast,channel關閉後對這個channel的請求快快速失敗this
protected void failFast(final Channel channel) { Iterator<Entry<Integer, ResponseFuture>> it = responseTable.entrySet().iterator(); while (it.hasNext()) { Entry<Integer, ResponseFuture> entry = it.next(); if (entry.getValue().getProcessChannel() == channel) { Integer opaque = entry.getKey(); if (opaque != null) { // 請求失敗=》 requestFail(opaque); } } } }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#requestFail.net
private void requestFail(final int opaque) { ResponseFuture responseFuture = responseTable.remove(opaque); if (responseFuture != null) { responseFuture.setSendRequestOK(false); responseFuture.putResponse(null); try { // 執行響應回調=》 executeInvokeCallback(responseFuture); } catch (Throwable e) { log.warn("execute callback in requestFail, and callback throw", e); } finally { responseFuture.release(); } } }
進入這個方法,異步執行回調處理org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#executeInvokeCallback3d
private void executeInvokeCallback(final ResponseFuture responseFuture) { boolean runInThisThread = false; ExecutorService executor = this.getCallbackExecutor(); if (executor != null) { try { executor.submit(new Runnable() { @Override public void run() { try { // 異步執行回調處理=》 responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("execute callback in executor exception, and callback throw", e); } finally { responseFuture.release(); } } }); } catch (Exception e) { runInThisThread = true; log.warn("execute callback in executor exception, maybe executor busy", e); } } else { runInThisThread = true; } if (runInThisThread) { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("executeInvokeCallback Exception", e); } finally { responseFuture.release(); } } }
進入這個方法org.apache.rocketmq.remoting.netty.ResponseFuture#executeInvokeCallbacknetty
public void executeInvokeCallback() { if (invokeCallback != null) { // 自旋鎖實現 if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) { // 執行響應回調=》 invokeCallback.operationComplete(this); } } }
nettt client處理handler,NettyClientHandler
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { // 消息讀取=》 processMessageReceived(ctx, msg); } }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: // 請求消息處理 =》 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: // 響應消息處理 processResponseCommand(ctx, cmd); break; default: break; } } }
進入這個方法請求處理org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { // 公平的處理請求 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { // 客戶自定義的鉤子實現類 RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook != null) { // 這裏mq提供了一些鉤子方法能夠擴展的地方,請求前處理邏輯能夠在這裏擴展 rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); } // 處理請求,有各個實現,主要都是netty通訊 =》TODO 文章 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); if (rpcHook != null) { // 執行rocketmq請求的後置處理鉤子方法 rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); } // 若是不是單線請求 if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } }
未完待續。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣