rocketmq源碼解析namesrvController啓動③

說在前面apache

接着上面的介紹namesrvController啓動緩存

 

源碼解析微信

返回方法,處理請求,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand異步








public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {//        公平的處理請求final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {//                        客戶自定義的鉤子實現類RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();if (rpcHook != null) {//                            這裏mq提供了一些鉤子方法能夠擴展的地方,請求前處理邏輯能夠在這裏擴展rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);}//                        處理請求,有各個實現,主要都是netty通訊 =》TODOfinal RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);if (rpcHook != null) {//                            執行rocketmq請求的後置處理鉤子方法rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);}//                        若是不是單線請求if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};//            系統繁忙,暫時啓動流量控制if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);//                異步提交請求pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}//                若是不是單途,系統繁忙,暫時啓動流量控制if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";//            請求編碼不支持final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}    }

進入方法,執行rocketmq請求的後置處理鉤子方法,org.apache.rocketmq.remoting.RPCHook#doAfterResponse 用戶須要實現這個鉤子方法。ide

返回方法,響應消息處理,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand函數


public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {final int opaque = cmd.getOpaque();//        從響應緩存信息中獲取responseFuturefinal ResponseFuture responseFuture = responseTable.get(opaque);if (responseFuture != null) {responseFuture.setResponseCommand(cmd);responseTable.remove(opaque);if (responseFuture.getInvokeCallback() != null) {//                若是響應對象有回調處理就處理回調函數=》executeInvokeCallback(responseFuture);} else {responseFuture.putResponse(cmd);responseFuture.release();}} else {log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}    }

進入方法,若是響應對象有回調處理就處理回調函數,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#executeInvokeCallbackthis

private void executeInvokeCallback(final ResponseFuture responseFuture) {boolean runInThisThread = false;ExecutorService executor = this.getCallbackExecutor();if (executor != null) {try {executor.submit(new Runnable() {@Overridepublic void run() {try {//                            異步執行回調處理=》responseFuture.executeInvokeCallback();} catch (Throwable e) {log.warn("execute callback in executor exception, and callback throw", e);} finally {//                            釋放信號=》responseFuture.release();}}});} catch (Exception e) {runInThisThread = true;log.warn("execute callback in executor exception, maybe executor busy", e);}} else {runInThisThread = true;}if (runInThisThread) {try {responseFuture.executeInvokeCallback();} catch (Throwable e) {log.warn("executeInvokeCallback Exception", e);} finally {responseFuture.release();}}    }

進入方法,異步執行回調處理,org.apache.rocketmq.remoting.netty.ResponseFuture#executeInvokeCallback編碼

public void executeInvokeCallback() {if (invokeCallback != null) {//            自旋鎖實現if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {//                執行響應回調=》invokeCallback.operationComplete(this);}}    }

進入方法,org.apache.rocketmq.remoting.InvokeCallback#operationComplete 用戶須要實現這個方法。.net

 

說在最後netty

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索