dubbo請求處理線程模型實現分析

問題的由來:java

若是事件處理的邏輯能迅速完成,而且不會發起新的 IO 請求,好比只是在內存中記個標識,
則直接在 IO 線程上處理更快,由於減小了線程池調度。數據庫

但若是事件處理邏輯較慢,或者須要發起新的 IO 請求,好比須要查詢數據庫,則必須派發到
線程池,不然 IO 線程阻塞,將致使不能接收其它請求。api

若是用 IO 線程處理事件,又在事件處理過程當中發起新的 IO 請求,好比在鏈接事件中發起登
錄請求,會報「可能引起死鎖」異常,但不會真死鎖。app

所以,須要經過不一樣的派發策略和不一樣的線程池配置的組合來應對不一樣的場景。異步

這裏說的IO線程(以netty爲例)是netty啓動服務時指定的boss/worker執行器中的woker線程。ide

具體配置方式以下兩種:函數

<dubbo:provider dispatcher="xxx" />
或者
<dubbo:protocol name="dubbo" dispatcher="xxx" />源碼分析

目前dubbo提供的Dispatcher擴展實現有以下5種實現,默認派發方式是allui

all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher

在分析源碼以前,這裏再溫習下裝飾模式,由於dubbo從交換層到傳輸層經過裝飾模式完成了多消息的接收處理,心跳,線程派發,消息解碼,請求響應消息的處理邏輯。最外層裝飾總優於裏層的方法的調用。
本文雖然說是要分析線程派發模型,但會從鏈接接處理基本handler開始,層層分析包裹在它外層的裝飾類。this

裝飾模式類關係圖以下

如圖裝飾模式主要包含如下幾種類:
業務接口類,定義要裝飾的業務操做
業務實現類,也就是要被裝飾的類
裝飾類父類,它一樣實現了被裝飾的業務接口,同時它經過構造函數,內部持有一個裝飾接口類型的對象,通常這個對象提供接口方法默認實現。
具體裝飾類,要繼承裝飾類父類,不一樣的裝飾類,能夠重寫父類方式完成具體的裝飾操做。
有時也能夠沒有裝飾類父類,直接有裝飾類實現接口完成裝飾。

接下來就從裝飾模式的角度分析源碼

因爲dispatcher配置是服務端的,這裏從服務暴露流程中分析dubbo的實現。

具體能夠看下DubboProtocol類裏的私有變量

requestHandler,它就是被裝飾的類。

定義以下:

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        //調用服務端實現方法,返回結果。
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                //獲取暴露的服務代理
                Invoker<?> invoker = getInvoker(channel, inv);
                //若是是callback 須要處理高版本調用低版本的問題
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                //經過代理服務,執行方法
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

        @Override
        //接受消息處理方法
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                //調用消息,調用replay
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

        @Override
        //客戶端鏈接後處理方法
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_CONNECT_KEY);
        }

        @Override
        //斷開鏈接後處理方法
        public void disconnected(Channel channel) throws RemotingException {
            if (logger.isInfoEnabled()) {
                logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
            }
            invoke(channel, Constants.ON_DISCONNECT_KEY);
        }
        //調用過程
        private void invoke(Channel channel, String methodKey) {
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }
        //從url中建立調用對象
        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }
            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
            invocation.setAttachment(Constants.PATH_KEY, url.getPath());
            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
            if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
            }
            return invocation;
        }
    };

它到是個匿名類,經過實現抽象類ExchangeHandlerAdapter定義來實例化獲得,ExchangeHandlerAdapter繼承關係以下

能夠看到它和它的祖先類,實現了ChannelHandler接口5個關鍵方法,鏈接,斷開鏈接,發送消息,接受消息和異常處理方法。也是rpc調用的經常使用處理方法。
同時也是線程派發處理關注的方法。

因此ChannelHandler就是,裝飾模式裏的業務接口類。


接下來,就是找裝飾類的過程了。
能夠找到requestHandler對象第一被使用是在DubboProtocol的createServer方法中

try {
    //Exchangers是門面類,裏面封裝了具體交換層實現
    server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
    throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
//跟到Exchangers.bind方法
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
	//經過spi會走HeaderExchanger的bind邏輯
        return getExchanger(url).bind(url, handler);
    }
//HeaderExchanger的bind方法

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        //能夠看到這時原始handler第一被裝飾
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

HeaderExchangeHandler裝飾類

能夠看下類的繼承關係

能夠看到HeaderExchangeHandler實現了ChannelHandler接口,符合裝飾模式要求。

看下它的構造函數:

public HeaderExchangeHandler(ExchangeHandler handler) {
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.handler = handler;
    }

這裏的ExchangeHandler是ChannelHandler子接口,符合裝飾模式經過構造函數持有接口類型對象引用。
下面看下它對主要幾個rpc方法的裝飾實現:

//鏈接處理邏輯
    public void connected(Channel channel) throws RemotingException {
        //添加一些心跳時間參數
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            //經過被包裝類對應方法處理
            handler.connected(exchangeChannel);
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
   //斷開邏輯,
    public void disconnected(Channel channel) throws RemotingException {
         //添加一些心跳時間參數
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
	  //經過被包裝類對應方法處理
            handler.disconnected(exchangeChannel);
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
    //發送數據
    public void sent(Channel channel, Object message) throws RemotingException {
        Throwable exception = null;
        try {
            channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
	      //調用被包裝類對應方法處理。
                handler.sent(exchangeChannel, message);
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        } catch (Throwable t) {
            exception = t;
        }
        //發送消息,如果請求消息,有個異步發送重試邏輯
        if (message instanceof Request) {
            Request request = (Request) message;
            DefaultFuture.sent(channel, request);
        }
        if (exception != null) {
            if (exception instanceof RuntimeException) {
                throw (RuntimeException) exception;
            } else if (exception instanceof RemotingException) {
                throw (RemotingException) exception;
            } else {
                throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
                        exception.getMessage(), exception);
            }
        }
    }

    /***
     * 接受請求數據,經過handleRequest方法處理後獲得處理結果。
     * @param channel channel.
     * @param message message.
     * @throws RemotingException
     */
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {//是往返消息,調用私有方法handleRequest處理獲得結果
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {//不須要回復的消息調用
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
	        //處理響應消息的邏輯
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
    //異常處理
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        if (exception instanceof ExecutionException) {
            ExecutionException e = (ExecutionException) exception;
            Object msg = e.getRequest();
            if (msg instanceof Request) {
                Request req = (Request) msg;
                if (req.isTwoWay() && !req.isHeartbeat()) {//有往返要求,就回復消息
                    Response res = new Response(req.getId(), req.getVersion());
                    res.setStatus(Response.SERVER_ERROR);
                    res.setErrorMessage(StringUtils.toString(e));
                    channel.send(res);
                    return;
                }
            }
        }
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            //調用對應被裝飾類方法
            handler.caught(exchangeChannel, exception);
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

HeaderExchangeHandler裝飾類,在Request/Response層面定義了請求響應消息的處理邏輯。

第二個裝飾類DecodeHandler

經過源碼能夠看到DecodeHandler它和它的父類AbstractChannelHandlerDelegate共同完成了對ChannelHandler接口方法的裝飾,看下DecodeHandler具體裝飾的received方法:

public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }

        if (message instanceof Request) {
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }

        handler.received(channel, message);
    }

DecodeHandler類如它的名稱,主要經過對received方法的裝飾處理,完成完成消息解碼的處理。

接着

return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));

這句繼續跟蹤方法

//Transporters.bind方法
   public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
	//根據spi 這裏具體走NettyTransporter.bind方法
        return getTransporter().bind(url, handler);
    }

    //NettyTransporter的bind方法
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
       //這裏是建立NettyServer實例
        return new NettyServer(url, listener);
    }
    //NettyServer構造器
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
       //這裏看下ChannelHandlers.wrap方法
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

   //ChannelHandlers.wrap方法
   public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        //調用內部wrapInternal方法
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

   //wrapInternal方法
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    //這裏終於看到經過spi獲取Dispatcher實現的代碼
    //還能看到經過Dispatcher.dispatch方法返回的handler後又通過了兩層裝飾,HeartbeatHandler而後MultiMessageHandler類
      return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
	.getAdaptiveExtension().dispatch(handler, url)));
    }

這裏再分析下ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()的代碼實現:

public class Dispatcher$Adaptive implements com.alibaba.dubbo.remoting.Dispatcher {
    public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) {
        if (arg1 == null) 
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        //默認是all實現方案
        String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "all")));
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])");
        com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName);
        //調用接口Dispatcher實現的dispatch方法返回ChannelHandler對象
        return extension.dispatch(arg0, arg1);
    }
}

下面就具體對照用戶手冊上關於派發實現的說明,分別對照源碼分析下:

1, all實現,用戶手冊說,全部消息都派發到線程池,包括請求,響應,鏈接事件,斷開事件,心跳等。

看下實現類AllDispatcher

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
          //使用AllChannelHandler類實現
        return new AllChannelHandler(handler, url);
    }

}

AllChannelHandler類,經過類結構能夠看到它也是ChannelHandler的裝飾類。

裝飾類結構清晰。經過代碼可知,其餘幾種線程分派模型實現裝飾類,都遵循一樣的繼承機構,都會繼承

WrappedChannelHandler

看下它對裝飾方法的實現

//鏈接事件放入線程池
 public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
 //斷開事件鏈接放入線程池
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
 //接受請求(包含回覆消息處理)消息放入線程池
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO 臨時解決線程池滿後異常信息沒法發送到對端的問題。待重構
            //fix 線程池滿了拒絕調用不返回,致使消費者一直等待超時
        	if(message instanceof Request && t instanceof RejectedExecutionException){
        		Request request = (Request)message;
        		if(request.isTwoWay()){
        			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
        			Response response = new Response(request.getId(), request.getVersion());
        			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
        			response.setErrorMessage(msg);
        			channel.send(response);
        			return;
        		}
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
   //異常處理線程池
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

經過實現看到,它把全部操做都放入了線程池中執行。可是心跳消息的接受和發送沒有進入線程池。

2,direct 分配實現,文檔上說,全部消息都不派發到線程池,所有在 IO 線程上直接執行。

實現類DirectDispatcher

public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
       //直接返回原生的handler不進行另外裝飾
        return handler;
    }

}

如它文檔所說同樣,全部消息處理不派發線程池。

3,message 手冊上說,只有請求響應消息派發到線程池,其它鏈接斷開事件,心跳等消息,直接在 IO 線程上執行。

實現類MessageOnlyDispatcher

public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        //經過MessageOnlyChannelHandler裝飾類處理
        return new MessageOnlyChannelHandler(handler, url);
    }

}

這裏在貼下它的繼承圖:

具體裝飾實現
 

public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    //接收請求(包括響應)消息放在線程池。
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

}

如文檔所說,只有請求(響應發送)消息放入線程池執行。

4,execution 手冊上說,只請求消息派發到線程池,不含響應,響應和其它鏈接斷開事件,心跳等消息,直接在 IO 線程上執行。

實現類ExecutionDispatcher

public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
       //經過裝飾類ExecutionChannelHandler實現
        return new ExecutionChannelHandler(handler, url);
    }

}

實現以下:

public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    //鏈接事件放入線程池
    public void connected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    }
   //斷開事件鏈接放入線程池
    public void disconnected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
    }
    //消息接受(響應消息發送)放入線程池
    public void received(Channel channel, Object message) throws RemotingException {
    	try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO 臨時解決線程池滿後異常信息沒法發送到對端的問題。待重構
            //fix 線程池滿了拒絕調用不返回,致使消費者一直等待超時
        	if(message instanceof Request &&
        			t instanceof RejectedExecutionException){
        		Request request = (Request)message;
        		if(request.isTwoWay()){
        			String msg = "Server side("+url.getIp()+","+url.getPort()+") threadpool is exhausted ,detail msg:"+t.getMessage();
        			Response response = new Response(request.getId(), request.getVersion());
        			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
        			response.setErrorMessage(msg);
        			channel.send(response);
        			return;
        		}
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
   //異常消息放入線程池
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
    }

}

經過實現能夠看到,它同all派發實現同樣,並非只有請求放入線程池。這個手冊上說的不同,手冊有誤,仍是沒有實現!!

5,connection實現,手冊說,在 IO 線程上,將鏈接斷開事件放入隊列,有序逐個執行,其它消息派發到線程池。

實現類ConnectionOrderedDispatcher

public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
       //裝飾類ConnectionOrderedChannelHandler實現
        return new ConnectionOrderedChannelHandler(handler, url);
    }

}

ConnectionOrderedChannelHandler實現:

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        //經過定義只有一個線程的線程池,保證執行的順序
        //用LinkedBlockingQueue保存待處理的任務
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME 沒有地方釋放connectExecutor!
        //這是等待隊列報警大小
        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

     //鏈接事件放入隊列
    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
    //斷開事件放入隊列
    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
   //放入線程池
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
        	//fix 線程池滿了拒絕調用不返回,致使消費者一直等待超時
        	if(message instanceof Request && t instanceof RejectedExecutionException){
        		Request request = (Request)message;
        		if(request.isTwoWay()){
        			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
        			Response response = new Response(request.getId(), request.getVersion());
        			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
        			response.setErrorMessage(msg);
        			channel.send(response);
        			return;
        		}
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
    //放入線程池
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queuewarninglimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
        }
    }
}

經過代碼分析,能夠看到本實現如文檔說的同樣把鏈接斷事件處理放入隊列,有序執行,其餘放入線程池。

以上就是具體線程派發模型的分析。

最後再看下上面提到的最後兩個裝飾類,

HeartbeatHandler裝飾類

public class HeartbeatHandler extends AbstractChannelHandlerDelegate {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);

    public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";

    public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";

    public HeartbeatHandler(ChannelHandler handler) {
        super(handler);
    }

    public void connected(Channel channel) throws RemotingException {
        setReadTimestamp(channel);
        setWriteTimestamp(channel);
        handler.connected(channel);
    }

    public void disconnected(Channel channel) throws RemotingException {
        clearReadTimestamp(channel);
        clearWriteTimestamp(channel);
        handler.disconnected(channel);
    }

    public void sent(Channel channel, Object message) throws RemotingException {
        setWriteTimestamp(channel);
        handler.sent(channel, message);
    }

    /***
     * 心跳消息的接受和發送
     * 
     * @param channel
     * @param message
     * @throws RemotingException
     */
    public void received(Channel channel, Object message) throws RemotingException {
        setReadTimestamp(channel);
        if (isHeartbeatRequest(message)) {
            Request req = (Request) message;
            if (req.isTwoWay()) {
                Response res = new Response(req.getId(), req.getVersion());
                res.setEvent(Response.HEARTBEAT_EVENT);
                channel.send(res);
                if (logger.isInfoEnabled()) {
                    int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                    }
                }
            }
            return;
        }
        if (isHeartbeatResponse(message)) {
            if (logger.isDebugEnabled()) {
                logger.debug(
                        new StringBuilder(32)
                                .append("Receive heartbeat response in thread ")
                                .append(Thread.currentThread().getName())
                                .toString());
            }
            return;
        }
	//非心跳消息的接受,走派發裝飾類
        handler.received(channel, message);
    }
}

能夠看到HeartbeatHandler對received方法進行了處理,因此消息的接受和發送是不會派發到線程池的。

MultiMessageHandler裝飾類

public class MultiMessageHandler extends AbstractChannelHandlerDelegate {

    public MultiMessageHandler(ChannelHandler handler) {
        super(handler);
    }

    @SuppressWarnings("unchecked")
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        //多個消息類型時,循環接受
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }
}

此裝飾類,主要完成多消息類型的循環解析接收。

因此到了NettyServer類,原始的handler已經通過的5層的裝飾。
這裏在其父類AbstractServer的構造方法中加斷點,截圖看下handler對象圖

能夠印證。

相關文章
相關標籤/搜索