dubbo經過netty將請求發送到provider的時候,provider以前已經啓動好的NettyServer監聽指定端口的時候會收到來自consumer的請求,將經過網絡發送來的二進制編碼成Request交給上層處理。dubbo從Request中取出調用信息,找到以前的Invoker,而後通過filter,最後經過代理調用到提供服務的方法。java
provider處理請求的調用堆棧以下:網絡
sayHe110:18, TestDubb0Servicelmpl (com.test.service.impl) invokeMethod:-1, Wrapper1 (com. alibabadubbo. common.bytecode) dolnvoke:46, JavassistProxyFactory$1 (com.alibaba.dubbo.rpc.proxy.javassist) invoke:72, AbstractProxylnvoker (com.alibaba.dubbo.rpc.proxy) invoke:53, InvokerWrapper (com.alibaba.dubbo.rpc.protocol) invoke:64, ExceptionFilter .com alibaba.dubbo.rpc filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:64, MonitorFilter .com alibaba.dubbo. monitor.support) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:42, TimeoutFilter .com alibaba.dubbo. rpc.filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:49, TokenFilter .com alibaba.dubbo. roc. filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:78, TraceFilter .com alibaba dubbo. roc. protocol.dubbo.filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:60, ContextFilter .com alibaba.dubbo. roc. filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:132, GenericFilter .com alibaba.dubbo. roc. filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:38, ClassLoaderFilter .com alibaba dubbo.rpc.filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:38, EchoFilter .com alibaba dubbo. rpc filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) reply:108, DubboProtocol$1 .com alibaba dubbo.rpcprotocol.dubbo) handleRequest:86, HeaderExchangeHandler (com.alibaba.dubbo.remoting.exchange.support.header) received:172, HeaderExchangeHandler (com.alibaba dubbo. remoting. exchange.support.header) received:52, DecodeHandler (com.alibaba dubbo.remoting. transport) run:82, ChannelEventRunnable (com.alibaba.dubbo.remoting.transport.dispatcher) runWorker:1142, ThreadPoolExecutor (java.util.concurrent) run:617, ThreadPoolExecutor$Worker (java.util.concurrent) run:745, Thread (java.lang)
從調用堆棧基本能夠看出provider整個處理請求的過程,比較簡單,可是須要知道爲何調用過程是這樣的?其中關鍵類是何時在初始化的?怎麼初始化的?app
接下來解決一下問題:socket
- 爲何是從ChannelEventRunnable開始的?誰初始化的ChannelEventRunnable?ChannelEventRunnable做用是什麼?
- 爲何會調用到上面堆棧中的幾個handler(也就是handler是怎麼初始化的)?
- filter鏈怎麼初始化的?
原本這些問題在export的時候若是仔細查看源碼已經能夠解決了,可是真正用到的時候是處理請求的時候,因此這裏算是補上以前export過程的一些關鍵步驟。ide
ChannelEventRunnable初始化
上面的調用堆棧中,是在線程池中一個單獨的線程來處理請求,因此先從線程池中調用的線程開始,ChannelEventRunnable的構造過程。oop
接着前面provider export的時候會啓動NettyServer,因此ChannelEventRunnable的建立也從NettyServer的啓動提及,ChannelEventRunnable被初始化的過程會涉及到netty的部份內容:ui
- NettyServer#doOpen,NettyServer啓動的時候會建立NioServerSocketChannelFactory,該factory負責建立netty放入全部channel
- 在NioServerSocketChannelFactory構造方法中會初始化NioWorkerPool,在該類的構造方法中建立NioWorker
- 在建立NioWorker的過程當中,調用超類AbstractNioSelector的構造方法
// NioWorker構造方法中會調用超類AbstractNioSelector的構造方法 AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) { this.executor = executor; openSelector(determiner); } // org.jboss.netty.channel.socket.nio.AbstractNioSelector#openSelector private void openSelector(ThreadNameDeterminer determiner) { try { // open selector selector = SelectorUtil.open(); } catch (Throwable t) { throw new ChannelException("Failed to create a selector.", t); } // Start the worker thread with the new Selector. boolean success = false; try { // new一個thread,將當前初始化的NioWorker做爲入參,也就是說最終要運行的是NioWorker.run // 這個start方法裏面會將新建的這個線程放到線程池中運行 // 這裏的executor就是new NioServerSocketChannelFactory時候的入參worker,也就是worker線程池 DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner)); success = true; } finally { // 省略中間代碼... } assert selector != null && selector.isOpen(); } // org.jboss.netty.channel.socket.nio.AbstractNioWorker#newThreadRenamingRunnable @Override protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) { // 這裏的this就是初始化的NioWorker return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner); } // org.jboss.netty.channel.socket.nio.NioWorker#run @Override public void run() { // 上面DeadLockProofWorker.start裏面啓動的線程會調用這個run方法 // 這裏調用了超類的run方法,最終會調用到org.jboss.netty.channel.socket.nio.AbstractNioSelector#run // AbstractNioSelector#run super.run(); recvBufferPool.releaseExternalResources(); } // AbstractNioSelector#run // 這個方法是NioWorker真正處理邏輯的地方,死循環調用select接受IO事件,而後處理 public void run() { thread = Thread.currentThread(); int selectReturnsImmediately = 0; Selector selector = this.selector; if (selector == null) { return; } // use 80% of the timeout for measure final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100; boolean wakenupFromLoop = false; for (;;) { wakenUp.set(false); try { long beforeSelect = System.nanoTime(); // 監聽I/O事件發生 int selected = select(selector); // 省略中間代碼... if (shutdown) { // 省略中間代碼... } else { // 處理I/O事件 process(selector); } } catch (Throwable t) { // 省略中間代碼... } } }
接下來到初始化ChannelEventRunnable的調用堆棧this
終於到了ChannelEventRunnable開始初始化的地方,全部的ChannelEventRunnable都是在AllChannelHandler中完成初始化,並加入到線程池中執行,下面以收到connect事件爲例編碼
public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try{ // 初始化ChannelEventRunnable並將其加入線程池 // 這裏的線程池是com.alibaba.dubbo.common.threadpool.ThreadPool這個擴展,默認配置的是"fixed",也就是FixedThreadPool cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED)); }catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t); } }
處理請求
上面最終啓動了ChannelEventRunnable線程,在這個線程中會最終調用到咱們的SayHello方法中,這個類負責分類處理各類接收到的I/O事件url
// com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run public void run() { switch (state) { case CONNECTED: try{ // 接收到鏈接 handler.connected(channel); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try{ // 鏈接斷開 handler.disconnected(channel); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try{ // 發送數據 handler.sent(channel,message); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is "+ message,e); } break; case RECEIVED: try{ // 收到數據 handler.received(channel, message); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is "+ message,e); } break; case CAUGHT: try{ // 處理異常 handler.caught(channel, exception); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is "+ channel + ", message is: " + message + ", exception is " + exception,e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } }
上面經過調用handler的相關方法來處理的,接下來看看handler是什麼?
handler初始化
從最上面的調用堆棧裏面有這些handler
com.alibaba.dubbo.remoting.transport.DecodeHandler#DecodeHandler com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler // 最上面調用堆棧中com alibaba dubbo.rpcprotocol.dubbo.DubboProtocol$1.reply其實就是線面這個接口的實現類 com.alibaba.dubbo.remoting.exchange.ExchangeHandler
以前在dubbo export中說過啓動NettyServer的調用堆棧,可是並無詳細看每個調用方法,這裏把相關重要的方法拿出來
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { // 這些請求received、connected、disconnected最終都會調用下面這個方法處理 public Object reply(ExchangeChannel channel, Object message) throws RemotingException { // 省略中間代碼... } // 省略中間代碼... } // com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer private ExchangeServer createServer(URL url) { // 省略中間代碼... // 這裏的handler就是上面初始化的,是一個匿名內部類,也就是com.alibaba.dubbo.remoting.exchange.ExchangeHandler的實現類 server = Exchangers.bind(url, requestHandler); // 省略中間代碼... return server; } // com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger#bind public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { // 這裏的handler就是上面bind方法傳入的requestHandler // 因此這裏就是初始化DecodeHandler和HeaderExchangeHandler的地方,也就說傳入Transporters.bind方法的是DecodeHandler類型 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
ChannelEventRunnable中的handler是什麼類型?
從最上面的堆棧已經知道這個handler其實就是DecodeHandler,也就是初始化ChannelEventRunnable的時候傳入的handler,接下來須要弄清楚的是爲何是DecodeHandler。
上面剛說過ChannelEventRunnable的初始化是由AllChannelHandler中的某一個方法初始化的,那麼做爲構造參數傳入ChannelEventRunnable的handler也就是WrappedChannelHandler#handler(這個類是AllChannelHandler的超類),如今要找到AllChannelHandler是怎麼初始化的。
// com.alibaba.dubbo.remoting.transport.netty.NettyServer#NettyServer // 上面說handler的初始化的時候,Transporters.bind方法會最終調用NettyServer的構造方法 public NettyServer(URL url, ChannelHandler handler) throws RemotingException{ // 這裏的handler就是DecodeHandler super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } // com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers#wrap public static ChannelHandler wrap(ChannelHandler handler, URL url){ // 這裏的handler是DecodeHandler return ChannelHandlers.getInstance().wrapInternal(handler, url); } // com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers#wrapInternal protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { // 這裏的handler是DecodeHandler // 先獲取Dispatcher的擴展類,默認是com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher // 而後調用AllDispatcher.dispatch方法 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); } // com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher#dispatch public ChannelHandler dispatch(ChannelHandler handler, URL url) { // 這裏的handler是DecodeHandler,因此AllChannelHandler的超類WrappedChannelHandler#handler就是DecodeHandler return new AllChannelHandler(handler, url); }
也就是ChannelEventRunnable中的handler就是HeaderExchanger#bind方法中new出來的DecodeHandler類型的對象
filter鏈構造
filter鏈的構造原本也是在provider export服務的時候完成的,同理consumer端是在refer服務的時候完成filter鏈的構造。
consumer和provider的filter鏈都是在下面的類中構造的,查看前面的service_export和service_reference的調用堆棧就能夠看到對該類的調用。
// com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol){ if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } public int getDefaultPort() { return protocol.getDefaultPort(); } // service export的時候調用 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 先構造filter鏈再繼續後面的export return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } // consumer refer的仍是調用 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } // 這裏是先refer調用建立DubboInvoker,而後才構造filter鏈,由於consumer是先通過filter鏈,再通過DubboInvoker處理,而provider是先通過DubboProtocol處理,而後調用filter鏈 return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); } public void destroy() { protocol.destroy(); } // private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // 獲取全部符合條件的filter擴展,條件包括 // 1. filter擴展類上面group對應的值和要求的group(入參)一致 // 2. url中也能夠指定加載的filter或者剔除的filter,url配置的key就是入參的key List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker<T> next = last; // 每一個filter使用一個Invoker包裹 last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { // 將next傳入,在filter負責調用,由此構成鏈 return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } }
因此如今返回看最前面的調用堆棧一切應該是瓜熟蒂落了,netty接收到I/O請求後,通知到NioWorker,在NioWorker線程中通過pipeline的處理後啓動了ChannelEventRunnable線程;在ChannelEventRunnable線程線程中根據接收到的不一樣事件調用handler的不一樣方法來處理,通過多個handler處理以後,通過的是filter鏈,最後會調用到咱們編寫的service方法。執行完咱們的方法以後,dubo會將結果經過netty發送給consumer。
總結
上面經過提問題的方式,解讀了一些閱讀源碼中的關鍵代碼,如今將service export和service reply結合起來,再去閱讀源代碼就就本能讀懂全部主流程了,就能明白源代碼爲何這麼寫。